From 4388987812ebd68c65a4b876ac5414a4d2b3f97f Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Tue, 17 Oct 2023 15:56:18 -0700 Subject: [PATCH 1/8] Update Worker extension to 1.0.4 (#2641) --- release_notes.md | 1 + src/Worker.Extensions.DurableTask/AssemblyInfo.cs | 2 +- .../Worker.Extensions.DurableTask.csproj | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/release_notes.md b/release_notes.md index 2d1644667..45bef4c6c 100644 --- a/release_notes.md +++ b/release_notes.md @@ -5,6 +5,7 @@ ### Bug Fixes - Fix support for distributed tracing v2 in dotnet-isolated and Java (https://github.com/Azure/azure-functions-durable-extension/pull/2634) +- Update Microsoft.DurableTask.\* dependencies to v1.0.5 ### Breaking Changes diff --git a/src/Worker.Extensions.DurableTask/AssemblyInfo.cs b/src/Worker.Extensions.DurableTask/AssemblyInfo.cs index 9c2e574e4..d455ef9c6 100644 --- a/src/Worker.Extensions.DurableTask/AssemblyInfo.cs +++ b/src/Worker.Extensions.DurableTask/AssemblyInfo.cs @@ -4,4 +4,4 @@ using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; // TODO: Find a way to generate this dynamically at build-time -[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "2.11.*")] +[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "2.12.*")] diff --git a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj index 4c7aba5e3..f31cc4b9e 100644 --- a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj +++ b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj @@ -29,7 +29,7 @@ ..\..\sign.snk - 1.0.3 + 1.0.4 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) @@ -38,8 +38,8 @@ - - + + From bbd7ad6d3c8917545033b35c9bcb3bf00801625d Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Thu, 19 Oct 2023 16:09:37 -0700 Subject: [PATCH 2/8] Merge feature/core-entities to dev (#2574) * udpate readme. * update durability provider class for new core-entities support. (#2570) * update durability provider class for new core-entities support. * add configuration setting for max entity concurrency to DurableTaskOptions * minor fixes. * update DurableClient to take advantage of native entity queries (#2571) * update DurableClient to take advantage of native entity queries if available * fix minor errors. * address PR feedback * implement passthrough middleware for entities (#2572) * implement passthrough middleware for entities. * propagate changes to protocol * update/simplify protobuf format * address PR feedback * implement entity queries for grpc listener (#2573) * implement entity queries for grpc listener * propagate changes to protocol * update/simplify protobuf format * Various fixes (#2585) * durability provider must implement and pass-through IEntityOrchestrationService since it wraps the orchestration service * simple mistake * fix misunderstanding of initializer syntax (produced null, not empty list) * fix missing failure details * fix missing compile-time switch for trigger value type * fix missing optional arguments * fix missing override * simplify how entities are excluded from instance queries (#2586) * add an entity example to the DotNetIsolated smoke test project. (#2584) * add an entity example to the DotNetIsolated smoke test project. * remove superfluous argument. * address PR feedback * Entities: Add worker side entity trigger and logic (#2576) * Add worker side entity trigger and logic * update comments * Address PR comments * another small fix that got lost somewhere. (#2596) * Update packages and version for entities preview (#2599) * Switch to Microsoft.DurableTask.Grpc (#2605) * Fix grpc core (#2616) * pass entity parameters for task orchestration. (#2611) * Core entities/various fixes and updates (#2619) * assign the necessary AzureStorageOrchestrationServiceSettings * propagate changes to query name and metadata parameters * add missing override for TaskOrchestrationEntityFeature * Update to entities preview 2 (#2620) * Add callback handler for entity dispatching (#2624) * Core entities/propagate changes (#2625) * add configuration for EnableEntitySupport * rename includeStateless to includeTransient * Rev dependencies to entities-preview.2 (#2627) * Call EnsureLegalAccess from EntityFeature in dotnet-isolated (#2633) * create a better error message in situations where client entity functions are called on a backend that does not support entities (#2630) * Rev package versions, update release notes (#2638) * Address smoke test build issue (#2647) * fix translation of legacy query to new entity query support (#2648) * fix translation of legacy query to new entity query support * comment out CleanEntityStorage_Many * try to enable CI on feature branch * Revert "comment out CleanEntityStorage_Many" This reverts commit aeaa4b8c7a6502fa85b37ff56785672b0e46edd5. * update to preview.2 packages --------- Co-authored-by: Varshitha Bachu Co-authored-by: Jacob Viau --- azure-pipelines.yml | 5 + release_notes.md | 23 ++- .../AzureStorageDurabilityProvider.cs | 27 ++- .../AzureStorageDurabilityProviderFactory.cs | 15 ++ .../EntityTriggerAttributeBindingProvider.cs | 57 ++++-- ...strationTriggerAttributeBindingProvider.cs | 1 + .../ContextImplementations/DurableClient.cs | 88 +++++++++ .../RemoteEntityContext.cs | 28 +++ .../RemoteOrchestratorContext.cs | 7 +- .../DurabilityProvider.cs | 22 ++- .../DurableEntityStatus.cs | 20 +++ .../DurableTaskExtension.cs | 3 +- .../LocalGrpcListener.cs | 116 ++++++++++++ ...t.Azure.WebJobs.Extensions.DurableTask.xml | 24 +++ .../Options/DurableTaskOptions.cs | 17 ++ .../OutOfProcMiddleware.cs | 168 +++++++++++++++++- .../ProtobufUtils.cs | 159 +++++++++++++++++ .../WebJobs.Extensions.DurableTask.csproj | 11 +- .../AssemblyInfo.cs | 2 +- .../DurableTaskExtensionStartup.cs | 2 + .../DurableTaskFunctionsMiddleware.cs | 68 +++++-- .../EntityTriggerAttribute.cs | 32 ++++ .../FunctionsDurableClientProvider.cs | 1 + .../FunctionsDurableTaskClient.cs | 3 + ...tionsOrchestrationContext.EntityFeature.cs | 58 ++++++ .../FunctionsOrchestrationContext.cs | 23 ++- .../OrchestrationInputConverter.cs | 2 +- .../TaskEntityDispatcher.cs | 104 +++++++++++ .../Worker.Extensions.DurableTask.csproj | 11 +- .../DotNetIsolated/Counter.cs | 149 ++++++++++++++++ .../DotNetIsolated/DotNetIsolated.csproj | 2 +- 31 files changed, 1189 insertions(+), 59 deletions(-) create mode 100644 src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs create mode 100644 src/Worker.Extensions.DurableTask/EntityTriggerAttribute.cs create mode 100644 src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.EntityFeature.cs create mode 100644 src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs create mode 100644 test/SmokeTests/OOProcSmokeTests/DotNetIsolated/Counter.cs diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 68ef305bf..5ab030524 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,3 +1,8 @@ +pr: + - main + - dev + - feature/* + jobs: - job: FunctionsV1Tests diff --git a/release_notes.md b/release_notes.md index 45bef4c6c..8069fb780 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,12 +1,31 @@ # Release Notes +## Microsoft.Azure.Functions.Worker.Extensions.DurableTask v1.1.0-preview.1 + ### New Features +- Support entities for .NET isolated + ### Bug Fixes -- Fix support for distributed tracing v2 in dotnet-isolated and Java (https://github.com/Azure/azure-functions-durable-extension/pull/2634) -- Update Microsoft.DurableTask.\* dependencies to v1.0.5 +### Breaking Changes + +### Dependency Updates + +`Microsoft.DurableTask.*` to `1.1.0-preview.1` + +## Microsoft.Azure.WebJobs.Extensions.DurableTask v2.12.0-preview.1 + +### New Features + +- Updates to take advantage of new core-entity support + +### Bug Fixes ### Breaking Changes ### Dependency Updates + +`Microsoft.Azure.DurableTask.Core` to `2.16.0-preview.2` +`Microsoft.Azure.DurableTask.AzureStorage` to `1.16.0-preview.2` + diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs index 38151e3cc..2920a0f37 100644 --- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs @@ -9,6 +9,7 @@ using DurableTask.AzureStorage; using DurableTask.AzureStorage.Tracking; using DurableTask.Core; +using DurableTask.Core.Entities; using Microsoft.Extensions.Logging; using Microsoft.WindowsAzure.Storage; using Newtonsoft.Json; @@ -19,6 +20,7 @@ using Microsoft.Azure.WebJobs.Host.Scale; #endif using AzureStorage = DurableTask.AzureStorage; +using DTCore = DurableTask.Core; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { @@ -54,8 +56,6 @@ public AzureStorageDurabilityProvider( this.logger = logger; } - public override bool SupportsEntities => true; - public override bool CheckStatusBeforeRaiseEvent => true; /// @@ -98,6 +98,29 @@ public async override Task> GetAllOrchestrationStatesW /// public async override Task RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings) + { + EntityBackendQueries entityBackendQueries = (this.serviceClient as IEntityOrchestrationService)?.EntityBackendQueries; + + if (entityBackendQueries != null) // entity queries are natively supported + { + var entity = await entityBackendQueries.GetEntityAsync(new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), cancellation: default); + + if (entity == null) + { + return null; + } + else + { + return entity.Value.SerializedState; + } + } + else // fall back to old implementation + { + return await this.LegacyImplementationOfRetrieveSerializedEntityState(entityId, serializerSettings); + } + } + + private async Task LegacyImplementationOfRetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings) { var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId); IList stateList = await this.serviceClient.GetOrchestrationStateAsync(instanceId, false); diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs index c86fd2be1..0162d26b4 100644 --- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs +++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs @@ -19,6 +19,7 @@ internal class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactor private readonly AzureStorageOptions azureStorageOptions; private readonly INameResolver nameResolver; private readonly ILoggerFactory loggerFactory; + private readonly bool useSeparateQueueForEntityWorkItems; private readonly bool inConsumption; // If true, optimize defaults for consumption private AzureStorageDurabilityProvider defaultStorageProvider; @@ -56,6 +57,7 @@ public AzureStorageDurabilityProviderFactory( // different defaults for key configuration values. int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount; int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount; + int maxConcurrentEntitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount; int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000; if (this.inConsumption) @@ -71,9 +73,18 @@ public AzureStorageDurabilityProviderFactory( } } + WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType(); + if (runtimeType == WorkerRuntimeType.DotNetIsolated || + runtimeType == WorkerRuntimeType.Java || + runtimeType == WorkerRuntimeType.Custom) + { + this.useSeparateQueueForEntityWorkItems = true; + } + // The following defaults are only applied if the customer did not explicitely set them on `host.json` this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault; this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault; + this.options.MaxConcurrentEntityFunctions = this.options.MaxConcurrentEntityFunctions ?? maxConcurrentEntitiesDefault; this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault; // Override the configuration defaults with user-provided values in host.json, if any. @@ -188,6 +199,7 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe WorkItemQueueVisibilityTimeout = this.azureStorageOptions.WorkItemQueueVisibilityTimeout, MaxConcurrentTaskOrchestrationWorkItems = this.options.MaxConcurrentOrchestratorFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"), MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"), + MaxConcurrentTaskEntityWorkItems = this.options.MaxConcurrentEntityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentEntityFunctions)} needs a default value"), ExtendedSessionsEnabled = this.options.ExtendedSessionsEnabled, ExtendedSessionIdleTimeout = extendedSessionTimeout, MaxQueuePollingInterval = this.azureStorageOptions.MaxQueuePollingInterval, @@ -202,6 +214,9 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe LoggerFactory = this.loggerFactory, UseLegacyPartitionManagement = this.azureStorageOptions.UseLegacyPartitionManagement, UseTablePartitionManagement = this.azureStorageOptions.UseTablePartitionManagement, + UseSeparateQueueForEntityWorkItems = this.useSeparateQueueForEntityWorkItems, + EntityMessageReorderWindowInMinutes = this.options.EntityMessageReorderWindowInMinutes, + MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize, }; if (this.inConsumption) diff --git a/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs b/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs index 3f3e8c02b..8e30157c8 100644 --- a/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/Bindings/EntityTriggerAttributeBindingProvider.cs @@ -9,6 +9,7 @@ using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Protocols; using Microsoft.Azure.WebJobs.Host.Triggers; +using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask @@ -57,6 +58,8 @@ public EntityTriggerAttributeBindingProvider( private class EntityTriggerBinding : ITriggerBinding { + private static readonly IReadOnlyDictionary EmptyBindingData = new Dictionary(capacity: 0); + private readonly DurableTaskExtension config; private readonly ParameterInfo parameterInfo; private readonly FunctionName entityName; @@ -75,7 +78,10 @@ public EntityTriggerBinding( this.BindingDataContract = GetBindingDataContract(parameterInfo); } - public Type TriggerValueType => typeof(IDurableEntityContext); + // Out-of-proc V2 uses a different trigger value type + public Type TriggerValueType => this.config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough ? + typeof(RemoteEntityContext) : + typeof(IDurableEntityContext); public IReadOnlyDictionary BindingDataContract { get; } @@ -95,15 +101,16 @@ private static IReadOnlyDictionary GetBindingDataContract(Paramete public Task BindAsync(object value, ValueBindingContext context) { - var entityContext = (DurableEntityContext)value; - Type destinationType = this.parameterInfo.ParameterType; - - object? convertedValue = null; - if (destinationType == typeof(IDurableEntityContext)) + if (value is DurableEntityContext entityContext) { - convertedValue = entityContext; + Type destinationType = this.parameterInfo.ParameterType; + + object? convertedValue = null; + if (destinationType == typeof(IDurableEntityContext)) + { + convertedValue = entityContext; #if !FUNCTIONS_V1 - ((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext; + ((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext; #endif } else if (destinationType == typeof(string)) @@ -111,15 +118,35 @@ public Task BindAsync(object value, ValueBindingContext context) convertedValue = EntityContextToString(entityContext); } - var inputValueProvider = new ObjectValueProvider( - convertedValue ?? value, - this.parameterInfo.ParameterType); + var inputValueProvider = new ObjectValueProvider( + convertedValue ?? value, + this.parameterInfo.ParameterType); - var bindingData = new Dictionary(StringComparer.OrdinalIgnoreCase); - bindingData[this.parameterInfo.Name!] = convertedValue; + var bindingData = new Dictionary(StringComparer.OrdinalIgnoreCase); + bindingData[this.parameterInfo.Name!] = convertedValue; - var triggerData = new TriggerData(inputValueProvider, bindingData); - return Task.FromResult(triggerData); + var triggerData = new TriggerData(inputValueProvider, bindingData); + return Task.FromResult(triggerData); + } +#if FUNCTIONS_V3_OR_GREATER + else if (value is RemoteEntityContext remoteContext) + { + // Generate a byte array which is the serialized protobuf payload + // https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization + var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest(); + + // We convert the binary payload into a base64 string because that seems to be the most commonly supported + // format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful. + string encodedRequest = ProtobufUtils.Base64Encode(entityBatchRequest); + var contextValueProvider = new ObjectValueProvider(encodedRequest, typeof(string)); + var triggerData = new TriggerData(contextValueProvider, EmptyBindingData); + return Task.FromResult(triggerData); + } +#endif + else + { + throw new ArgumentException($"Don't know how to bind to {value?.GetType().Name ?? "null"}.", nameof(value)); + } } public ParameterDescriptor ToParameterDescriptor() diff --git a/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs b/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs index ce1758146..3a277a505 100644 --- a/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/Bindings/OrchestrationTriggerAttributeBindingProvider.cs @@ -170,6 +170,7 @@ public Task BindAsync(object? value, ValueBindingContext context) InstanceId = remoteContext.InstanceId, PastEvents = { remoteContext.PastEvents.Select(ProtobufUtils.ToHistoryEventProto) }, NewEvents = { remoteContext.NewEvents.Select(ProtobufUtils.ToHistoryEventProto) }, + EntityParameters = remoteContext.EntityParameters.ToProtobuf(), }; // We convert the binary payload into a base64 string because that seems to be the most commonly supported diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs index 0c347e9aa..7e6709a2c 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs @@ -10,12 +10,14 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.Entities; using DurableTask.Core.History; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.WebApiCompatShim; using Newtonsoft.Json; using Newtonsoft.Json.Linq; +using DTCore = DurableTask.Core; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { @@ -548,6 +550,28 @@ Task> IDurableEntityClient.ReadEntityStateAsync(Entity } private async Task> ReadEntityStateAsync(DurabilityProvider provider, EntityId entityId) + { + if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries)) + { + EntityBackendQueries.EntityMetadata? metaData = await entityBackendQueries.GetEntityAsync( + new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), + includeState: true, + includeStateless: false, + cancellation: default); + + return new EntityStateResponse() + { + EntityExists = metaData.HasValue, + EntityState = metaData.HasValue ? this.messageDataConverter.Deserialize(metaData.Value.SerializedState) : default, + }; + } + else + { + return await this.ReadEntityStateLegacyAsync(provider, entityId); + } + } + + private async Task> ReadEntityStateLegacyAsync(DurabilityProvider provider, EntityId entityId) { string entityState = await provider.RetrieveSerializedEntityState(entityId, this.messageDataConverter.JsonSettings); @@ -611,6 +635,40 @@ private static EntityQueryResult ConvertToEntityQueryResult(IEnumerable async Task IDurableEntityClient.ListEntitiesAsync(EntityQuery query, CancellationToken cancellationToken) + { + if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries)) + { + var result = await entityBackendQueries.QueryEntitiesAsync( + new EntityBackendQueries.EntityQuery() + { + InstanceIdStartsWith = query.EntityName != null ? $"@{query.EntityName.ToLowerInvariant()}@" : null, + IncludeTransient = query.IncludeDeleted, + IncludeState = query.FetchState, + LastModifiedFrom = query.LastOperationFrom == DateTime.MinValue ? (DateTime?)null : (DateTime?)query.LastOperationFrom, + LastModifiedTo = query.LastOperationTo, + PageSize = query.PageSize, + ContinuationToken = query.ContinuationToken, + }, + cancellationToken); + + return new EntityQueryResult() + { + Entities = result.Results.Select(ConvertEntityMetadata).ToList(), + ContinuationToken = result.ContinuationToken, + }; + + DurableEntityStatus ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metadata) + { + return new DurableEntityStatus(metadata); + } + } + else + { + return await this.ListEntitiesLegacyAsync(query, cancellationToken); + } + } + + private async Task ListEntitiesLegacyAsync(EntityQuery query, CancellationToken cancellationToken) { var condition = new OrchestrationStatusQueryCondition(query); EntityQueryResult entityResult; @@ -633,6 +691,30 @@ async Task IDurableEntityClient.ListEntitiesAsync(EntityQuery /// async Task IDurableEntityClient.CleanEntityStorageAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken) + { + if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries)) + { + var result = await entityBackendQueries.CleanEntityStorageAsync( + new EntityBackendQueries.CleanEntityStorageRequest() + { + RemoveEmptyEntities = removeEmptyEntities, + ReleaseOrphanedLocks = releaseOrphanedLocks, + }, + cancellationToken); + + return new CleanEntityStorageResult() + { + NumberOfEmptyEntitiesRemoved = result.EmptyEntitiesRemoved, + NumberOfOrphanedLocksRemoved = result.OrphanedLocksReleased, + }; + } + else + { + return await this.CleanEntityStorageLegacyAsync(removeEmptyEntities, releaseOrphanedLocks, cancellationToken); + } + } + + private async Task CleanEntityStorageLegacyAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken) { DateTime now = DateTime.UtcNow; CleanEntityStorageResult finalResult = default; @@ -706,6 +788,12 @@ async Task CheckForOrphanedLockAndFixIt(DurableOrchestrationStatus status, strin return finalResult; } + private bool HasNativeEntityQuerySupport(DurabilityProvider provider, out EntityBackendQueries entityBackendQueries) + { + entityBackendQueries = (provider as IEntityOrchestrationService)?.EntityBackendQueries; + return entityBackendQueries != null; + } + private async Task GetOrchestrationInstanceStateAsync(string instanceId) { return await GetOrchestrationInstanceStateAsync(this.client, instanceId); diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs new file mode 100644 index 000000000..07ea0e921 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs @@ -0,0 +1,28 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. +#nullable enable +using System; +using System.Collections.Generic; +using DurableTask.Core; +using DurableTask.Core.Command; +using DurableTask.Core.Entities.OperationFormat; +using DurableTask.Core.History; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + internal class RemoteEntityContext + { + public RemoteEntityContext(EntityBatchRequest batchRequest) + { + this.Request = batchRequest; + } + + [JsonProperty("request")] + public EntityBatchRequest Request { get; private set; } + + [JsonIgnore] + internal EntityBatchResult? Result { get; set; } + } +} diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs index 3894256ec..a1d9319e3 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using DurableTask.Core; using DurableTask.Core.Command; +using DurableTask.Core.Entities; using DurableTask.Core.History; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -17,9 +18,10 @@ internal class RemoteOrchestratorContext private OrchestratorExecutionResult? executionResult; - public RemoteOrchestratorContext(OrchestrationRuntimeState runtimeState) + public RemoteOrchestratorContext(OrchestrationRuntimeState runtimeState, TaskOrchestrationEntityParameters? entityParameters) { this.runtimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState)); + this.EntityParameters = entityParameters; } [JsonProperty("instanceId")] @@ -43,6 +45,9 @@ public RemoteOrchestratorContext(OrchestrationRuntimeState runtimeState) [JsonIgnore] internal string? SerializedOutput { get; private set; } + [JsonIgnore] + internal TaskOrchestrationEntityParameters? EntityParameters { get; private set; } + internal void SetResult(IEnumerable actions, string customStatus) { var result = new OrchestratorExecutionResult diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs index 529a7adab..9adc6cf84 100644 --- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.Entities; using DurableTask.Core.History; using DurableTask.Core.Query; using Newtonsoft.Json; @@ -27,7 +28,8 @@ public class DurabilityProvider : IOrchestrationService, IOrchestrationServiceClient, IOrchestrationServiceQueryClient, - IOrchestrationServicePurgeClient + IOrchestrationServicePurgeClient, + IEntityOrchestrationService { internal const string NoConnectionDetails = "default"; @@ -36,6 +38,7 @@ public class DurabilityProvider : private readonly string name; private readonly IOrchestrationService innerService; private readonly IOrchestrationServiceClient innerServiceClient; + private readonly IEntityOrchestrationService entityOrchestrationService; private readonly string connectionName; /// @@ -52,6 +55,7 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv this.name = storageProviderName ?? throw new ArgumentNullException(nameof(storageProviderName)); this.innerService = service ?? throw new ArgumentNullException(nameof(service)); this.innerServiceClient = serviceClient ?? throw new ArgumentNullException(nameof(serviceClient)); + this.entityOrchestrationService = service as IEntityOrchestrationService; this.connectionName = connectionName ?? throw new ArgumentNullException(connectionName); } @@ -64,7 +68,7 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv /// /// Specifies whether the durability provider supports Durable Entities. /// - public virtual bool SupportsEntities => false; + public virtual bool SupportsEntities => this.entityOrchestrationService?.EntityBackendProperties != null; /// /// Specifies whether the backend's WaitForOrchestration is implemented without polling. @@ -121,6 +125,20 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv /// public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems; + /// + EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => this.entityOrchestrationService?.EntityBackendProperties; + + /// + EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => this.entityOrchestrationService?.EntityBackendQueries; + + /// + Task IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.entityOrchestrationService.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); + + /// + Task IEntityOrchestrationService.LockNextEntityWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.entityOrchestrationService.LockNextEntityWorkItemAsync(receiveTimeout, cancellationToken); + internal string GetBackendInfo() { return this.GetOrchestrationService().ToString(); diff --git a/src/WebJobs.Extensions.DurableTask/DurableEntityStatus.cs b/src/WebJobs.Extensions.DurableTask/DurableEntityStatus.cs index 3142fba4e..2bc29598a 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableEntityStatus.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableEntityStatus.cs @@ -3,6 +3,7 @@ using System; using System.Runtime.Serialization; +using DurableTask.Core.Entities; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -40,6 +41,25 @@ internal DurableEntityStatus(DurableOrchestrationStatus orchestrationStatus) } } + internal DurableEntityStatus(EntityBackendQueries.EntityMetadata metadata) + { + this.EntityId = new EntityId(metadata.EntityId.Name, metadata.EntityId.Key); + this.LastOperationTime = metadata.LastModifiedTime; + if (metadata.SerializedState != null) + { + try + { + // Entity state is expected to be JSON-compatible + this.State = JToken.Parse(metadata.SerializedState); + } + catch (JsonException) + { + // Just in case the above assumption is ever wrong, fallback to a raw string + this.State = metadata.SerializedState; + } + } + } + /// /// Gets the EntityId of the queried entity instance. /// diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index dd351c1fe..3ae312bef 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -441,10 +441,11 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) { #if FUNCTIONS_V3_OR_GREATER // This is a newer, more performant flavor of orchestration/activity middleware that is being - // enabled for newer language runtimes. Support for entities in this model is TBD. + // enabled for newer language runtimes. var ooprocMiddleware = new OutOfProcMiddleware(this); this.taskHubWorker.AddActivityDispatcherMiddleware(ooprocMiddleware.CallActivityAsync); this.taskHubWorker.AddOrchestrationDispatcherMiddleware(ooprocMiddleware.CallOrchestratorAsync); + this.taskHubWorker.AddEntityDispatcherMiddleware(ooprocMiddleware.CallEntityAsync); #else // This can happen if, for example, a Java user tries to use Durable Functions while targeting V2 or V3 extension bundles // because those bundles target .NET Core 2.2, which doesn't support the gRPC libraries used in the modern out-of-proc implementation. diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs index caa5f04a5..0bc7a9bac 100644 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs @@ -9,12 +9,14 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.Entities; using DurableTask.Core.History; using DurableTask.Core.Query; using DurableTask.Core.Serializing.Internal; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Hosting; +using DTCore = DurableTask.Core; using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask @@ -165,6 +167,95 @@ public override Task Hello(Empty request, ServerCallContext context) return new P.RaiseEventResponse(); } + public async override Task SignalEntity(P.SignalEntityRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + EntityMessageEvent eventToSend = ClientEntityHelpers.EmitOperationSignal( + new OrchestrationInstance() { InstanceId = request.InstanceId }, + Guid.Parse(request.RequestId), + request.Name, + request.Input, + EntityMessageEvent.GetCappedScheduledTime( + DateTime.UtcNow, + entityOrchestrationService.EntityBackendProperties!.MaximumSignalDelayTime, + request.ScheduledTime?.ToDateTime())); + + await durabilityProvider.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); + + // No fields in the response + return new P.SignalEntityResponse(); + } + + public async override Task GetEntity(P.GetEntityRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + EntityBackendQueries.EntityMetadata? metaData = await entityOrchestrationService.EntityBackendQueries!.GetEntityAsync( + DTCore.Entities.EntityId.FromString(request.InstanceId), + request.IncludeState, + includeStateless: false, + context.CancellationToken); + + return new P.GetEntityResponse() + { + Exists = metaData.HasValue, + Entity = metaData.HasValue ? this.ConvertEntityMetadata(metaData.Value) : default, + }; + } + + public async override Task QueryEntities(P.QueryEntitiesRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + P.EntityQuery query = request.Query; + EntityBackendQueries.EntityQueryResult result = await entityOrchestrationService.EntityBackendQueries!.QueryEntitiesAsync( + new EntityBackendQueries.EntityQuery() + { + InstanceIdStartsWith = query.InstanceIdStartsWith, + LastModifiedFrom = query.LastModifiedFrom?.ToDateTime(), + LastModifiedTo = query.LastModifiedTo?.ToDateTime(), + IncludeTransient = query.IncludeTransient, + IncludeState = query.IncludeState, + ContinuationToken = query.ContinuationToken, + PageSize = query.PageSize, + }, + context.CancellationToken); + + var response = new P.QueryEntitiesResponse() + { + ContinuationToken = result.ContinuationToken, + }; + + foreach (EntityBackendQueries.EntityMetadata entityMetadata in result.Results) + { + response.Entities.Add(this.ConvertEntityMetadata(entityMetadata)); + } + + return response; + } + + public async override Task CleanEntityStorage(P.CleanEntityStorageRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + EntityBackendQueries.CleanEntityStorageResult result = await entityOrchestrationService.EntityBackendQueries!.CleanEntityStorageAsync( + new EntityBackendQueries.CleanEntityStorageRequest() + { + RemoveEmptyEntities = request.RemoveEmptyEntities, + ReleaseOrphanedLocks = request.ReleaseOrphanedLocks, + ContinuationToken = request.ContinuationToken, + }, + context.CancellationToken); + + return new P.CleanEntityStorageResponse() + { + EmptyEntitiesRemoved = result.EmptyEntitiesRemoved, + OrphanedLocksReleased = result.OrphanedLocksReleased, + ContinuationToken = result.ContinuationToken, + }; + } + public async override Task TerminateInstance(P.TerminateRequest request, ServerCallContext context) { await this.GetClient(context).TerminateAsync(request.InstanceId, request.Output); @@ -330,6 +421,31 @@ private IDurableClient GetClient(ServerCallContext context) { return this.extension.GetClient(this.GetAttribute(context)); } + + private void CheckEntitySupport(ServerCallContext context, out DurabilityProvider durabilityProvider, out IEntityOrchestrationService entityOrchestrationService) + { + durabilityProvider = this.GetDurabilityProvider(context); + entityOrchestrationService = durabilityProvider; + if (entityOrchestrationService?.EntityBackendProperties == null) + { + throw new RpcException(new Grpc.Core.Status( + Grpc.Core.StatusCode.Unimplemented, + $"Missing entity support for storage backend '{durabilityProvider.GetBackendInfo()}'. Entity support" + + $" may have not been implemented yet, or the selected package version is too old.")); + } + } + + private P.EntityMetadata ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metaData) + { + return new P.EntityMetadata() + { + InstanceId = metaData.EntityId.ToString(), + LastModifiedTime = metaData.LastModifiedTime.ToTimestamp(), + BacklogQueueSize = metaData.BacklogQueueSize, + LockedBy = metaData.LockedBy, + SerializedState = metaData.SerializedState, + }; + } } } } diff --git a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml index f76c0ad61..0088042e9 100644 --- a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml +++ b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml @@ -2026,6 +2026,18 @@ + + + + + + + + + + + + @@ -4263,6 +4275,18 @@ A positive integer configured by the host. + + + Gets or sets the maximum number of entity functions that can be processed concurrently on a single host instance. + + + Increasing entity function concurrency can result in increased throughput but can + also increase the total CPU and memory usage on a single worker instance. + + + A positive integer configured by the host. + + Gets or sets a value indicating whether to enable the local RPC endpoint managed by this extension. diff --git a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs index e5eac789f..bc555386f 100644 --- a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs +++ b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs @@ -99,6 +99,18 @@ public string HubName /// public int? MaxConcurrentOrchestratorFunctions { get; set; } = null; + /// + /// Gets or sets the maximum number of entity functions that can be processed concurrently on a single host instance. + /// + /// + /// Increasing entity function concurrency can result in increased throughput but can + /// also increase the total CPU and memory usage on a single worker instance. + /// + /// + /// A positive integer configured by the host. + /// + public int? MaxConcurrentEntityFunctions { get; set; } = null; + /// /// Gets or sets a value indicating whether to enable the local RPC endpoint managed by this extension. /// @@ -328,6 +340,11 @@ internal void Validate(INameResolver environmentVariableResolver, EndToEndTraceH throw new InvalidOperationException($"{nameof(this.MaxConcurrentOrchestratorFunctions)} must be a positive integer value."); } + if (this.MaxConcurrentEntityFunctions <= 0) + { + throw new InvalidOperationException($"{nameof(this.MaxConcurrentEntityFunctions)} must be a positive integer value."); + } + if (this.MaxEntityOperationBatchSize <= 0) { throw new InvalidOperationException($"{nameof(this.MaxEntityOperationBatchSize)} must be a positive integer value."); diff --git a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs index 1813c131c..65751729b 100644 --- a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs +++ b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs @@ -3,14 +3,18 @@ #nullable enable #if FUNCTIONS_V3_OR_GREATER using System; +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Middleware; using Microsoft.Azure.WebJobs.Host.Executors; +using Newtonsoft.Json; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { @@ -83,6 +87,8 @@ public async Task CallOrchestratorAsync(DispatchMiddlewareContext dispatchContex return; } + TaskOrchestrationEntityParameters? entityParameters = dispatchContext.GetProperty(); + bool isReplaying = runtimeState.PastEvents.Any(); this.TraceHelper.FunctionStarting( @@ -104,7 +110,7 @@ await this.LifeCycleNotificationHelper.OrchestratorStartingAsync( isReplay: false); } - var context = new RemoteOrchestratorContext(runtimeState); + var context = new RemoteOrchestratorContext(runtimeState, entityParameters); var input = new TriggeredFunctionData { @@ -235,6 +241,166 @@ await this.LifeCycleNotificationHelper.OrchestratorFailedAsync( dispatchContext.SetProperty(orchestratorResult); } + /// + /// Durable Task Framework entity middleware that invokes an out-of-process orchestrator function. + /// + /// This middleware context provided by the framework that contains information about the entity. + /// The next middleware handler in the pipeline. + /// Thrown if there is a recoverable error in the Functions runtime that's expected to be handled gracefully. + public async Task CallEntityAsync(DispatchMiddlewareContext dispatchContext, Func next) + { + EntityBatchRequest? batchRequest = dispatchContext.GetProperty(); + + if (batchRequest == null) + { + // This should never happen, and there's no good response we can return if it does. + throw new InvalidOperationException($"An entity was scheduled but no {nameof(EntityBatchRequest)} was found!"); + } + + if (batchRequest.InstanceId == null) + { + // This should never happen, and there's no good response we can return if it does. + throw new InvalidOperationException($"An entity was scheduled but InstanceId is null!"); + } + + EntityId entityId = EntityId.GetEntityIdFromSchedulerId(batchRequest.InstanceId); + FunctionName functionName = new FunctionName(entityId.EntityName); + RegisteredFunctionInfo functionInfo = this.extension.GetEntityInfo(functionName); + + void SetErrorResult(FailureDetails failureDetails) + { + // Returns a result with no operation results and no state change, + // and with failure details that explain what error was encountered. + dispatchContext.SetProperty(new EntityBatchResult() + { + Actions = new List(), + Results = new List(), + EntityState = batchRequest!.EntityState, + FailureDetails = failureDetails, + }); + } + + if (functionInfo == null) + { + SetErrorResult(new FailureDetails( + errorType: "EntityFunctionNotFound", + errorMessage: this.extension.GetInvalidEntityFunctionMessage(functionName.Name), + stackTrace: null, + innerFailure: null, + isNonRetriable: true)); + return; + } + + this.TraceHelper.FunctionStarting( + this.Options.HubName, + functionName.Name, + batchRequest.InstanceId, + this.extension.GetIntputOutputTrace(batchRequest.EntityState), + functionType: FunctionType.Entity, + isReplay: false); + + var context = new RemoteEntityContext(batchRequest); + + var input = new TriggeredFunctionData + { + TriggerValue = context, +#pragma warning disable CS0618 // Type or member is obsolete (not intended for general public use) + InvokeHandler = async functionInvoker => + { + // Invoke the function and look for a return value. Trigger return values are an undocumented feature that we depend on. + Task invokeTask = functionInvoker(); + if (invokeTask is not Task invokeTaskWithResult) + { + // This should never happen + throw new InvalidOperationException("The internal function invoker returned a task that does not support return values!"); + } + + // The return value is expected to be a base64 string containing the protobuf-encoding of the batch result. + string? triggerReturnValue = (await invokeTaskWithResult) as string; + if (string.IsNullOrEmpty(triggerReturnValue)) + { + throw new InvalidOperationException( + "The function invocation resulted in a null response. This means that either the entity function was implemented " + + "incorrectly, the Durable Task language SDK was implemented incorrectly, or that the destination language worker is not " + + "sending the function result back to the host."); + } + + byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue); + var response = Microsoft.DurableTask.Protobuf.EntityBatchResult.Parser.ParseFrom(triggerReturnValueBytes); + context.Result = response.ToEntityBatchResult(); + +#pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use) + }, + }; + + FunctionResult functionResult; + try + { + functionResult = await functionInfo.Executor.TryExecuteAsync( + input, + cancellationToken: this.HostLifetimeService.OnStopping); + + if (!functionResult.Succeeded) + { + // Shutdown can surface as a completed invocation in a failed state. + // Re-throw so we can abort this invocation. + this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested(); + } + } + catch (Exception hostRuntimeException) + { + string reason = this.HostLifetimeService.OnStopping.IsCancellationRequested ? + "The Functions/WebJobs runtime is shutting down!" : + $"Unhandled exception in the Functions/WebJobs runtime: {hostRuntimeException}"; + + this.TraceHelper.FunctionAborted( + this.Options.HubName, + functionName.Name, + batchRequest.InstanceId, + reason, + functionType: FunctionType.Entity); + + // This will abort the current execution and force an durable retry + throw new SessionAbortedException(reason); + } + + if (!functionResult.Succeeded) + { + this.TraceHelper.FunctionFailed( + this.Options.HubName, + functionName.Name, + batchRequest.InstanceId, + functionResult.Exception.ToString(), + FunctionType.Orchestrator, + isReplay: false); + + SetErrorResult(new FailureDetails( + errorType: "FunctionInvocationFailed", + errorMessage: $"Invocation of function '{functionName}' failed with an exception.", + stackTrace: null, + innerFailure: new FailureDetails(functionResult.Exception), + isNonRetriable: true)); + + return; + } + + EntityBatchResult batchResult = context.Result + ?? throw new InvalidOperationException($"The entity function executed successfully but {nameof(context.Result)} is still null!"); + + this.TraceHelper.FunctionCompleted( + this.Options.HubName, + functionName.Name, + batchRequest.InstanceId, + this.extension.GetIntputOutputTrace(batchRequest.EntityState), + batchResult.EntityState != null, + FunctionType.Entity, + isReplay: false); + + // Send the result of the orchestrator function to the DTFx dispatch pipeline. + // This allows us to bypass the default, in-process execution and process the given results immediately. + dispatchContext.SetProperty(batchResult); + } + /// /// Durable Task Framework activity middleware that invokes an out-of-process orchestrator function. /// diff --git a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs index b4cd992ef..57b84012b 100644 --- a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs +++ b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs @@ -5,10 +5,13 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using DurableTask.Core; using DurableTask.Core.Command; +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; using DurableTask.Core.Query; using Google.Protobuf; @@ -298,6 +301,20 @@ public static OrchestratorAction ToOrchestratorAction(P.OrchestratorAction a) } } + [return: NotNullIfNotNull("parameters")] + public static P.OrchestratorEntityParameters? ToProtobuf(this TaskOrchestrationEntityParameters? parameters) + { + if (parameters == null) + { + return null; + } + + return new P.OrchestratorEntityParameters + { + EntityMessageReorderWindow = Duration.FromTimeSpan(parameters.EntityMessageReorderWindow), + }; + } + public static string Base64Encode(IMessage message) { // Create a serialized payload using lower-level protobuf APIs. We do this to avoid allocating @@ -363,6 +380,7 @@ internal static OrchestrationQuery ToOrchestrationQuery(P.QueryInstancesRequest ContinuationToken = request.Query.ContinuationToken, InstanceIdPrefix = request.Query.InstanceIdPrefix, FetchInputsAndOutputs = request.Query.FetchInputsAndOutputs, + ExcludeEntities = true, }; // Empty lists are not allowed by the underlying code that takes in an OrchestrationQuery. However, @@ -430,6 +448,147 @@ internal static P.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeResul DeletedInstanceCount = result.DeletedInstanceCount, }; } + + /// + /// Converts a to . + /// + /// The operation request to convert. + /// The converted operation request. + [return: NotNullIfNotNull("entityBatchRequest")] + internal static P.EntityBatchRequest? ToEntityBatchRequest(this EntityBatchRequest? entityBatchRequest) + { + if (entityBatchRequest == null) + { + return null; + } + + var batchRequest = new P.EntityBatchRequest() + { + InstanceId = entityBatchRequest.InstanceId, + EntityState = entityBatchRequest.EntityState, + }; + + foreach (var operation in entityBatchRequest.Operations ?? Enumerable.Empty()) + { + batchRequest.Operations.Add(operation.ToOperationRequest()); + } + + return batchRequest; + } + + /// + /// Converts a to . + /// + /// The operation request to convert. + /// The converted operation request. + [return: NotNullIfNotNull("operationRequest")] + internal static P.OperationRequest? ToOperationRequest(this OperationRequest? operationRequest) + { + if (operationRequest == null) + { + return null; + } + + return new P.OperationRequest() + { + Operation = operationRequest.Operation, + Input = operationRequest.Input, + RequestId = operationRequest.Id.ToString(), + }; + } + + /// + /// Converts a to a . + /// + /// The operation result to convert. + /// The converted operation result. + [return: NotNullIfNotNull("entityBatchResult")] + internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult) + { + if (entityBatchResult == null) + { + return null; + } + + return new EntityBatchResult() + { + Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(), + EntityState = entityBatchResult.EntityState, + Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(), + FailureDetails = GetFailureDetails(entityBatchResult.FailureDetails), + }; + } + + /// + /// Converts a to a . + /// + /// The operation action to convert. + /// The converted operation action. + [return: NotNullIfNotNull("operationAction")] + internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction) + { + if (operationAction == null) + { + return null; + } + + switch (operationAction.OperationActionTypeCase) + { + case P.OperationAction.OperationActionTypeOneofCase.SendSignal: + + return new SendSignalOperationAction() + { + Name = operationAction.SendSignal.Name, + Input = operationAction.SendSignal.Input, + InstanceId = operationAction.SendSignal.InstanceId, + ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), + }; + + case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration: + + return new StartNewOrchestrationOperationAction() + { + Name = operationAction.StartNewOrchestration.Name, + Input = operationAction.StartNewOrchestration.Input, + InstanceId = operationAction.StartNewOrchestration.InstanceId, + Version = operationAction.StartNewOrchestration.Version, + }; + default: + throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported."); + } + } + + /// + /// Converts a to a . + /// + /// The operation result to convert. + /// The converted operation result. + [return: NotNullIfNotNull("operationResult")] + internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult) + { + if (operationResult == null) + { + return null; + } + + switch (operationResult.ResultTypeCase) + { + case P.OperationResult.ResultTypeOneofCase.Success: + return new OperationResult() + { + Result = operationResult.Success.Result, + }; + + case P.OperationResult.ResultTypeOneofCase.Failure: + return new OperationResult() + { + FailureDetails = GetFailureDetails(operationResult.Failure.FailureDetails), + }; + + default: + throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported."); + } + } } } #endif \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index 03f4eebad..f3ebdb994 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -5,9 +5,9 @@ Microsoft.Azure.WebJobs.Extensions.DurableTask Microsoft.Azure.WebJobs.Extensions.DurableTask 2 - 12 + 13 0 - $(MajorVersion).$(MinorVersion).$(PatchVersion) + $(MajorVersion).$(MinorVersion).$(PatchVersion)-preview.1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 Microsoft Corporation @@ -99,13 +99,14 @@ - + + - - + + diff --git a/src/Worker.Extensions.DurableTask/AssemblyInfo.cs b/src/Worker.Extensions.DurableTask/AssemblyInfo.cs index d455ef9c6..ecf1d0ed5 100644 --- a/src/Worker.Extensions.DurableTask/AssemblyInfo.cs +++ b/src/Worker.Extensions.DurableTask/AssemblyInfo.cs @@ -4,4 +4,4 @@ using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; // TODO: Find a way to generate this dynamically at build-time -[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "2.12.*")] +[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "2.13.0-preview.1")] diff --git a/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs b/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs index 516fd41af..e1edbd095 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs @@ -30,6 +30,7 @@ public override void Configure(IFunctionsWorkerApplicationBuilder applicationBui { applicationBuilder.Services.AddSingleton(); applicationBuilder.Services.AddOptions() + .Configure(options => options.EnableEntitySupport = true) .PostConfigure((opt, sp) => { if (GetConverter(sp) is DataConverter converter) @@ -39,6 +40,7 @@ public override void Configure(IFunctionsWorkerApplicationBuilder applicationBui }); applicationBuilder.Services.AddOptions() + .Configure(options => options.EnableEntitySupport = true) .PostConfigure((opt, sp) => { if (GetConverter(sp) is DataConverter converter) diff --git a/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs b/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs index 24d2be295..3da4d35d5 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs @@ -15,41 +15,83 @@ namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask; internal class DurableTaskFunctionsMiddleware : IFunctionsWorkerMiddleware { /// - public async Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next) + public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next) { - if (!IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerMetadata)) + if (IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerBinding)) { - await next(functionContext); - return; + return RunOrchestrationAsync(functionContext, triggerBinding, next); } - InputBindingData triggerInputData = await functionContext.BindInputAsync(triggerMetadata); + if (IsEntityTrigger(functionContext, out triggerBinding)) + { + return RunEntityAsync(functionContext, triggerBinding, next); + } + + return next(functionContext); + } + + private static bool IsOrchestrationTrigger( + FunctionContext context, [NotNullWhen(true)] out BindingMetadata? orchestrationTriggerBinding) + { + foreach (BindingMetadata binding in context.FunctionDefinition.InputBindings.Values) + { + if (string.Equals(binding.Type, "orchestrationTrigger", StringComparison.OrdinalIgnoreCase)) + { + orchestrationTriggerBinding = binding; + return true; + } + } + + orchestrationTriggerBinding = null; + return false; + } + + static async Task RunOrchestrationAsync( + FunctionContext context, BindingMetadata triggerBinding, FunctionExecutionDelegate next) + { + InputBindingData triggerInputData = await context.BindInputAsync(triggerBinding); if (triggerInputData?.Value is not string encodedOrchestratorState) { throw new InvalidOperationException("Orchestration history state was either missing from the input or not a string value."); } - FunctionsOrchestrator orchestrator = new(functionContext, next, triggerInputData); + FunctionsOrchestrator orchestrator = new(context, next, triggerInputData); string orchestratorOutput = GrpcOrchestrationRunner.LoadAndRun( - encodedOrchestratorState, orchestrator, functionContext.InstanceServices); + encodedOrchestratorState, orchestrator, context.InstanceServices); // Send the encoded orchestrator output as the return value seen by the functions host extension - functionContext.GetInvocationResult().Value = orchestratorOutput; + context.GetInvocationResult().Value = orchestratorOutput; } - private static bool IsOrchestrationTrigger( - FunctionContext context, [NotNullWhen(true)] out BindingMetadata? orchestrationTriggerBinding) + private static bool IsEntityTrigger( + FunctionContext context, [NotNullWhen(true)] out BindingMetadata? entityTriggerBinding) { foreach (BindingMetadata binding in context.FunctionDefinition.InputBindings.Values) { - if (string.Equals(binding.Type, "orchestrationTrigger")) + if (string.Equals(binding.Type, "entityTrigger", StringComparison.OrdinalIgnoreCase)) { - orchestrationTriggerBinding = binding; + entityTriggerBinding = binding; return true; } } - orchestrationTriggerBinding = null; + entityTriggerBinding = null; return false; } + + static async Task RunEntityAsync( + FunctionContext context, BindingMetadata triggerBinding, FunctionExecutionDelegate next) + { + InputBindingData triggerInputData = await context.BindInputAsync(triggerBinding); + if (triggerInputData?.Value is not string encodedEntityBatch) + { + throw new InvalidOperationException("Entity batch was either missing from the input or not a string value."); + } + + TaskEntityDispatcher dispatcher = new(encodedEntityBatch, context.InstanceServices); + triggerInputData.Value = dispatcher; + + await next(context); + context.GetInvocationResult().Value = dispatcher.Result; + } } diff --git a/src/Worker.Extensions.DurableTask/EntityTriggerAttribute.cs b/src/Worker.Extensions.DurableTask/EntityTriggerAttribute.cs new file mode 100644 index 000000000..8b9d450b3 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/EntityTriggerAttribute.cs @@ -0,0 +1,32 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; + +namespace Microsoft.Azure.Functions.Worker; + +/// +/// Trigger attribute used for durable entity functions. +/// +/// +/// Entity triggers must bind to . +/// +[AttributeUsage(AttributeTargets.Parameter)] +[DebuggerDisplay("{EntityName}")] +public sealed class EntityTriggerAttribute : TriggerBindingAttribute +{ + /// + /// Gets or sets the name of the entity function. + /// + /// + /// If not specified, the function name is used as the name of the entity. + /// This property supports binding parameters. + /// + /// + /// The name of the entity function or null to use the function name. + /// + public string? EntityName { get; set; } +} diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs index 03edaf463..2169306a7 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs @@ -136,6 +136,7 @@ public DurableTaskClient GetClient(Uri endpoint, string? taskHub, string? connec { Channel = channel, DataConverter = this.options.DataConverter, + EnableEntitySupport = this.options.EnableEntitySupport, }; ILogger logger = this.loggerFactory.CreateLogger(); diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs index 234192ece..d675f2ab2 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.DurableTask; using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; namespace Microsoft.Azure.Functions.Worker; @@ -25,6 +26,8 @@ public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString) public string? QueryString { get; } + public override DurableEntityClient Entities => this.inner.Entities; + public override ValueTask DisposeAsync() { // We do not dispose inner client as it has a longer life than this class. diff --git a/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.EntityFeature.cs b/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.EntityFeature.cs new file mode 100644 index 000000000..67d10e324 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.EntityFeature.cs @@ -0,0 +1,58 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; +using Microsoft.DurableTask.Entities; + +namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask; + +internal sealed partial class FunctionsOrchestrationContext +{ + private class EntityFeature : TaskOrchestrationEntityFeature + { + private readonly FunctionsOrchestrationContext parent; + private readonly TaskOrchestrationEntityFeature inner; + + public EntityFeature(FunctionsOrchestrationContext parent, TaskOrchestrationEntityFeature inner) + { + this.parent = parent; + this.inner = inner; + } + + public override Task CallEntityAsync( + EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) + { + this.parent.EnsureLegalAccess(); + return this.inner.CallEntityAsync(id, operationName, input, options); + } + + public override Task CallEntityAsync( + EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null) + { + this.parent.EnsureLegalAccess(); + return this.inner.CallEntityAsync(id, operationName, input, options); + } + + public override Task SignalEntityAsync( + EntityInstanceId id, string operationName, object? input = null, SignalEntityOptions? options = null) + { + this.parent.EnsureLegalAccess(); + return this.inner.SignalEntityAsync(id, operationName, input, options); + } + + public override bool InCriticalSection([NotNullWhen(true)] out IReadOnlyList? entityIds) + { + this.parent.EnsureLegalAccess(); + return this.inner.InCriticalSection(out entityIds); + } + + public override Task LockEntitiesAsync(IEnumerable entityIds) + { + this.parent.EnsureLegalAccess(); + return this.inner.LockEntitiesAsync(entityIds); + } + } +} diff --git a/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs b/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs index 7083993f5..53b7cca9e 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsOrchestrationContext.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.DurableTask; +using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Worker; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -22,6 +23,7 @@ internal sealed partial class FunctionsOrchestrationContext : TaskOrchestrationC private readonly DurableTaskWorkerOptions options; private InputConverter? inputConverter; + private EntityFeature? entities; public FunctionsOrchestrationContext(TaskOrchestrationContext innerContext, FunctionContext functionContext) { @@ -48,6 +50,9 @@ public FunctionsOrchestrationContext(TaskOrchestrationContext innerContext, Func protected override ILoggerFactory LoggerFactory { get; } + public override TaskOrchestrationEntityFeature Entities => + this.entities ??= new EntityFeature(this, this.innerContext.Entities); + public override T GetInput() { this.EnsureLegalAccess(); @@ -115,15 +120,6 @@ public override Task WaitForExternalEvent(string eventName, CancellationTo return this.innerContext.WaitForExternalEvent(eventName, cancellationToken); } - /// - /// Throws if accessed by a non-orchestrator thread or marks the current object as accessed successfully. - /// - private void EnsureLegalAccess() - { - this.ThrowIfIllegalAccess(); - this.IsAccessed = true; - } - internal void ThrowIfIllegalAccess() { // Only the orchestrator thread is allowed to run the task continuation. If we detect that some other thread @@ -145,4 +141,13 @@ internal void ThrowIfIllegalAccess() } } } + + /// + /// Throws if accessed by a non-orchestrator thread or marks the current object as accessed successfully. + /// + private void EnsureLegalAccess() + { + this.ThrowIfIllegalAccess(); + this.IsAccessed = true; + } } diff --git a/src/Worker.Extensions.DurableTask/OrchestrationInputConverter.cs b/src/Worker.Extensions.DurableTask/OrchestrationInputConverter.cs index 9ed0e040a..8ba105b16 100644 --- a/src/Worker.Extensions.DurableTask/OrchestrationInputConverter.cs +++ b/src/Worker.Extensions.DurableTask/OrchestrationInputConverter.cs @@ -61,7 +61,7 @@ public ValueTask ConvertAsync(ConverterContext context) // 3. The TargetType matches our cached type. // If these are met, then we assume this parameter is the orchestration input. if (context.Source is null - && context.FunctionContext.Items.TryGetValue(OrchestrationInputKey, out object value) + && context.FunctionContext.Items.TryGetValue(OrchestrationInputKey, out object? value) && context.TargetType == value?.GetType()) { // Remove this from the items so we bind this only once. diff --git a/src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs b/src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs new file mode 100644 index 000000000..1fe449c23 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/TaskEntityDispatcher.cs @@ -0,0 +1,104 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.DurableTask.Entities; +using Microsoft.DurableTask.Worker.Grpc; +using Microsoft.Extensions.DependencyInjection; + +namespace Microsoft.Azure.Functions.Worker; + +/// +/// Represents a task entity dispatch invocation. +/// +/// +/// This type is used to aid in dispatching a to the operation receiver object. +/// +public sealed class TaskEntityDispatcher +{ + private readonly string request; + private readonly IServiceProvider services; + + internal TaskEntityDispatcher(string request, IServiceProvider services) + { + this.request = request; + this.services = services; + } + + internal string Result { get; private set; } = string.Empty; + + /// + /// Dispatches this entity trigger to the provided . + /// + /// The task entity to dispatch to. + /// A task that completes when the dispatch has finished. + public async Task DispatchAsync(ITaskEntity entity) + { + if (entity == null) + { + throw new ArgumentNullException(nameof(entity)); + } + + this.Result = await GrpcEntityRunner.LoadAndRunAsync(this.request, entity); + } + + /// + /// Dispatches the entity trigger to an instance of the provided . + /// + /// If is a , it will be activated from + /// and then be dispatched to. + /// + /// + /// If is not , it is assumed the + /// represents the entity state and it will be deserialized and dispatched directly to the state. + /// + /// + /// The type to dispatch to. + /// A task that completes when the dispatch has finished. + public Task DispatchAsync() + { + if (typeof(ITaskEntity).IsAssignableFrom(typeof(T))) + { + ITaskEntity entity = (ITaskEntity)ActivatorUtilities.GetServiceOrCreateInstance(this.services)!; + return this.DispatchAsync(entity); + } + + return this.DispatchAsync(new StateEntity()); + } + + /// + /// Dispatches the entity trigger to the provided callback. + /// + /// The callback to handle the entity operation(s). + /// A task that completes when the operation(s) have finished. + public Task DispatchAsync(Func> handler) + { + if (handler is null) + { + throw new ArgumentNullException(nameof(handler)); + } + + return this.DispatchAsync(new DelegateEntity(handler)); + } + + private class StateEntity : TaskEntity + { + protected override bool AllowStateDispatch => true; + } + + private class DelegateEntity : ITaskEntity + { + private readonly Func> handler; + + public DelegateEntity(Func> handler) + { + this.handler = handler; + } + + public ValueTask RunAsync(TaskEntityOperation operation) + { + return this.handler(operation); + } + } +} diff --git a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj index f31cc4b9e..c74301555 100644 --- a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj +++ b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj @@ -29,17 +29,18 @@ ..\..\sign.snk - 1.0.4 + 1.1.0 + preview.1 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) - - - - + + + + diff --git a/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/Counter.cs b/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/Counter.cs new file mode 100644 index 000000000..8773d5a86 --- /dev/null +++ b/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/Counter.cs @@ -0,0 +1,149 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Text; +using Azure.Core; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace DotNetIsolated; + + +/// +/// A simple counter, demonstrating entity use. +/// +public class Counter +{ + public int CurrentValue { get; set; } + + public void Add(int amount) + { + this.CurrentValue += amount; + } + + public void Reset() + { + this.CurrentValue = 0; + } + + public int Get() + { + return this.CurrentValue; + } + + [Function(nameof(Counter))] + public static Task CounterEntity([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } +} + +/// +/// Provides three http triggers to test the counter entity. +/// +/// +/// (POST) send 5 increment signals to the counter instance @counter@aa: +/// curl http://localhost:7071/api/counter/aa -d '5' +/// (GET) read the current value of the counter instance @counter@aa: +/// curl http://localhost:7071/api/counter/aa +/// (DELETE) delete the counter instance @counter@aa: +/// curl http://localhost:7071/api/counter/aa -X delete +/// +public static class CounterTest +{ + [Function(nameof(SignalCounter))] + public static async Task SignalCounter( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "counter/{id}")] HttpRequestData request, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + CancellationToken cancellation, + string id) + { + ILogger logger = executionContext.GetLogger(nameof(Counter)); + + using StreamReader reader = new StreamReader(request.Body, Encoding.UTF8); + string body = await reader.ReadToEndAsync(); + if (! int.TryParse(body, out var count)) + { + var httpResponse = request.CreateResponse(System.Net.HttpStatusCode.BadRequest); + httpResponse.Headers.Add("Content-Type", "text/plain; charset=utf-8"); + httpResponse.WriteString($"Request body must contain an integer that indicates the number of signals to send.\n"); + return httpResponse; + }; + + var entityId = new EntityInstanceId("Counter", id); + logger.LogInformation($"Sending {count} increment messages to {entityId}..."); + + await Parallel.ForEachAsync( + Enumerable.Range(0, count), + cancellation, + (int i, CancellationToken cancellation) => + { + return new ValueTask(client.Entities.SignalEntityAsync(entityId, "add", 1, cancellation:cancellation)); + }); + + logger.LogInformation($"Sent {count} increment messages to {entityId}."); + return request.CreateResponse(System.Net.HttpStatusCode.Accepted); + } + + [Function(nameof(ReadCounter))] + public static async Task ReadCounter( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "counter/{id}")] HttpRequestData request, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string id) + { + ILogger logger = executionContext.GetLogger(nameof(Counter)); + var entityId = new EntityInstanceId("Counter", id); + + logger.LogInformation($"Reading state of {entityId}..."); + var response = await client.Entities.GetEntityAsync(entityId, includeState: true); + if (response?.IncludesState ?? false) + { + logger.LogInformation("Entity does not exist."); + } + else + { + logger.LogInformation("Entity state is: {State}", response!.State.Value); + } + + if (response == null) + { + return request.CreateResponse(System.Net.HttpStatusCode.NotFound); + } + else + { + int currentValue = response.State.ReadAs()!.CurrentValue; + var httpResponse = request.CreateResponse(System.Net.HttpStatusCode.OK); + httpResponse.Headers.Add("Content-Type", "text/plain; charset=utf-8"); + httpResponse.WriteString($"{currentValue}\n"); + return httpResponse; + } + } + + [Function(nameof(DeleteCounter))] + public static async Task DeleteCounter( + [HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route = "counter/{id}")] HttpRequestData request, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string id) + { + ILogger logger = executionContext.GetLogger(nameof(Counter)); + var entityId = new EntityInstanceId("Counter", id); + logger.LogInformation($"Deleting {entityId}..."); + + // All entities have a "delete" operation built in, so we can just send a signal + await client.Entities.SignalEntityAsync(entityId, "delete"); + + logger.LogInformation($"Sent deletion signal to {entityId}."); + return request.CreateResponse(System.Net.HttpStatusCode.OK); + } +} + + + diff --git a/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj b/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj index 9dfbda77f..c909ce880 100644 --- a/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj +++ b/test/SmokeTests/OOProcSmokeTests/DotNetIsolated/DotNetIsolated.csproj @@ -1,4 +1,4 @@ - + net6.0 v4 From 9f750ebc77fda1f52c5042dcdb41e6eb52ee7fdb Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Tue, 24 Oct 2023 10:04:16 -0700 Subject: [PATCH 3/8] Update to preview 2 (#2650) --- release_notes.md | 6 ++---- .../Worker.Extensions.DurableTask.csproj | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/release_notes.md b/release_notes.md index 8069fb780..6738d3ab6 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,18 +1,16 @@ # Release Notes -## Microsoft.Azure.Functions.Worker.Extensions.DurableTask v1.1.0-preview.1 +## Microsoft.Azure.Functions.Worker.Extensions.DurableTask v1.1.0-preview.2 ### New Features -- Support entities for .NET isolated - ### Bug Fixes ### Breaking Changes ### Dependency Updates -`Microsoft.DurableTask.*` to `1.1.0-preview.1` +`Microsoft.DurableTask.*` to `1.1.0-preview.2` ## Microsoft.Azure.WebJobs.Extensions.DurableTask v2.12.0-preview.1 diff --git a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj index c74301555..3f63320c5 100644 --- a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj +++ b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj @@ -30,7 +30,7 @@ 1.1.0 - preview.1 + preview.2 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) @@ -39,8 +39,8 @@ - - + + From b5c70e5e40d12a1aed281ada1c4d5651608df247 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Wed, 8 Nov 2023 10:04:30 -0800 Subject: [PATCH 4/8] Log orchestration error (#2657) --- .../FunctionsOrchestrator.cs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/Worker.Extensions.DurableTask/FunctionsOrchestrator.cs b/src/Worker.Extensions.DurableTask/FunctionsOrchestrator.cs index 30513336e..62cfae22d 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsOrchestrator.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsOrchestrator.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.Azure.Functions.Worker.Middleware; using Microsoft.DurableTask; +using Microsoft.Extensions.Logging; namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask; @@ -44,8 +45,19 @@ public FunctionsOrchestrator( this.contextBinding.Value = wrapperContext; this.inputContext.PrepareInput(input); - // This method will advance to the next middleware and throw if it detects an asynchronous execution. - await EnsureSynchronousExecution(this.functionContext, this.next, wrapperContext); + try + { + // This method will advance to the next middleware and throw if it detects an asynchronous execution. + await EnsureSynchronousExecution(this.functionContext, this.next, wrapperContext); + } + catch (Exception ex) + { + this.functionContext.GetLogger().LogError( + ex, + "An error occurred while executing the orchestrator function '{FunctionName}'.", + this.functionContext.FunctionDefinition.Name); + throw; + } // Set the raw function output as the orchestrator output object? functionOutput = this.functionContext.GetInvocationResult().Value; From 391486a81f8002e4e3606a958d7f3c79b0fdb84d Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 9 Nov 2023 12:46:04 -0800 Subject: [PATCH 5/8] Expose OOP invocation errors (#2654) * Expose OOP invocation errors * Add EntityFailureException * EnsureSuccess -> ThrowIfFailed --- release_notes.md | 13 ++----- .../RemoteEntityContext.cs | 38 ++++++++++++++++--- .../RemoteOrchestratorContext.cs | 34 ++++++++++++++--- .../Exceptions/EntityFailureException.cs | 16 ++++++++ .../OutOfProcMiddleware.cs | 9 +++-- 5 files changed, 88 insertions(+), 22 deletions(-) create mode 100644 src/WebJobs.Extensions.DurableTask/Exceptions/EntityFailureException.cs diff --git a/release_notes.md b/release_notes.md index 6738d3ab6..6f5e0c462 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,6 +1,6 @@ # Release Notes -## Microsoft.Azure.Functions.Worker.Extensions.DurableTask v1.1.0-preview.2 +## Microsoft.Azure.Functions.Worker.Extensions.DurableTask ### New Features @@ -10,20 +10,15 @@ ### Dependency Updates -`Microsoft.DurableTask.*` to `1.1.0-preview.2` - -## Microsoft.Azure.WebJobs.Extensions.DurableTask v2.12.0-preview.1 +## Microsoft.Azure.WebJobs.Extensions.DurableTask ### New Features -- Updates to take advantage of new core-entity support - ### Bug Fixes +- Fix failed orchestration/entities not showing up as function invocation failures. + ### Breaking Changes ### Dependency Updates -`Microsoft.Azure.DurableTask.Core` to `2.16.0-preview.2` -`Microsoft.Azure.DurableTask.AzureStorage` to `1.16.0-preview.2` - diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs index 07ea0e921..c2bf96369 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteEntityContext.cs @@ -3,12 +3,9 @@ #nullable enable using System; using System.Collections.Generic; -using DurableTask.Core; -using DurableTask.Core.Command; +using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; -using DurableTask.Core.History; using Newtonsoft.Json; -using Newtonsoft.Json.Linq; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { @@ -20,9 +17,40 @@ public RemoteEntityContext(EntityBatchRequest batchRequest) } [JsonProperty("request")] - public EntityBatchRequest Request { get; private set; } + internal EntityBatchRequest Request { get; private set; } [JsonIgnore] internal EntityBatchResult? Result { get; set; } + + internal void ThrowIfFailed() + { + if (this.Result == null) + { + throw new InvalidOperationException("Entity batch request has not been processed yet."); + } + + if (this.Result.FailureDetails is { } f) + { + throw new EntityFailureException(f.ErrorMessage); + } + + List? errors = null; + if (this.Result.Results is not null) + { + foreach (OperationResult result in this.Result.Results) + { + if (result.FailureDetails is { } failure) + { + errors ??= new List(); + errors.Add(new EntityFailureException(failure.ErrorMessage)); + } + } + } + + if (errors is not null) + { + throw errors.Count == 1 ? errors[0] : new AggregateException(errors); + } + } } } diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs index a1d9319e3..8cc630bf8 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/RemoteOrchestratorContext.cs @@ -6,6 +6,7 @@ using DurableTask.Core; using DurableTask.Core.Command; using DurableTask.Core.Entities; +using DurableTask.Core.Exceptions; using DurableTask.Core.History; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -18,6 +19,8 @@ internal class RemoteOrchestratorContext private OrchestratorExecutionResult? executionResult; + private Exception? failure; + public RemoteOrchestratorContext(OrchestrationRuntimeState runtimeState, TaskOrchestrationEntityParameters? entityParameters) { this.runtimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState)); @@ -48,6 +51,19 @@ public RemoteOrchestratorContext(OrchestrationRuntimeState runtimeState, TaskOrc [JsonIgnore] internal TaskOrchestrationEntityParameters? EntityParameters { get; private set; } + internal void ThrowIfFailed() + { + if (this.failure != null) + { + throw this.failure; + } + } + + internal OrchestratorExecutionResult GetResult() + { + return this.executionResult ?? throw new InvalidOperationException($"The execution result has not yet been set using {nameof(this.SetResult)}."); + } + internal void SetResult(IEnumerable actions, string customStatus) { var result = new OrchestratorExecutionResult @@ -107,16 +123,24 @@ private void SetResultInternal(OrchestratorExecutionResult result) this.OrchestratorCompleted = true; this.SerializedOutput = completeAction.Result; this.ContinuedAsNew = completeAction.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew; + + if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed) + { + string message = completeAction switch + { + { FailureDetails: { } f } => f.ErrorMessage, + { Result: { } r } => r, + _ => "Exception occurred during orchestration execution.", + }; + + this.failure = new OrchestrationFailureException(message); + } + break; } } this.executionResult = result; } - - internal OrchestratorExecutionResult GetResult() - { - return this.executionResult ?? throw new InvalidOperationException($"The execution result has not yet been set using {nameof(this.SetResult)}."); - } } } diff --git a/src/WebJobs.Extensions.DurableTask/Exceptions/EntityFailureException.cs b/src/WebJobs.Extensions.DurableTask/Exceptions/EntityFailureException.cs new file mode 100644 index 000000000..8c455f85a --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Exceptions/EntityFailureException.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System; + +// TODO: move to DurableTask.Core if needed. +namespace DurableTask.Core.Entities +{ + internal class EntityFailureException : Exception + { + public EntityFailureException(string message) + : base(message) + { + } + } +} diff --git a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs index 65751729b..ac7040b2f 100644 --- a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs +++ b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs @@ -14,7 +14,7 @@ using DurableTask.Core.History; using DurableTask.Core.Middleware; using Microsoft.Azure.WebJobs.Host.Executors; -using Newtonsoft.Json; +using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { @@ -137,10 +137,12 @@ await this.LifeCycleNotificationHelper.OrchestratorStartingAsync( } byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue); - var response = Microsoft.DurableTask.Protobuf.OrchestratorResponse.Parser.ParseFrom(triggerReturnValueBytes); + P.OrchestratorResponse response = P.OrchestratorResponse.Parser.ParseFrom(triggerReturnValueBytes); context.SetResult( response.Actions.Select(ProtobufUtils.ToOrchestratorAction), response.CustomStatus); + + context.ThrowIfFailed(); }, #pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use) }; @@ -326,9 +328,10 @@ void SetErrorResult(FailureDetails failureDetails) } byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue); - var response = Microsoft.DurableTask.Protobuf.EntityBatchResult.Parser.ParseFrom(triggerReturnValueBytes); + P.EntityBatchResult response = P.EntityBatchResult.Parser.ParseFrom(triggerReturnValueBytes); context.Result = response.ToEntityBatchResult(); + context.ThrowIfFailed(); #pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use) }, }; From f02cdd6bb7e2aa25c12da0ebfdee536384d6942f Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 9 Nov 2023 13:47:58 -0800 Subject: [PATCH 6/8] Fix unintentional input unwrapping for OOP workers (#2656) * Fix unintentional input unwrapping for OOP workers * Update release notes * Refine fix to only apply to dotnet-isolated and java --- release_notes.md | 4 ++-- .../ContextImplementations/DurableActivityContext.cs | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/release_notes.md b/release_notes.md index 6f5e0c462..5d4942399 100644 --- a/release_notes.md +++ b/release_notes.md @@ -16,9 +16,9 @@ ### Bug Fixes -- Fix failed orchestration/entities not showing up as function invocation failures. +- Fix issue where json token input (not a json object) was unwrapped before sending to an out-of-proc worker. This could then lead to deserialization issues as the wrapping quotes were missing. (Applies to dotnet-isolated and java only) +- Fix failed orchestration/entities not showing up as function invocation failures. (Applies to dotnet-isolated and java only) ### Breaking Changes ### Dependency Updates - diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableActivityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableActivityContext.cs index f0812bc64..560fedf48 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableActivityContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableActivityContext.cs @@ -20,6 +20,7 @@ internal class DurableActivityContext : IDurableActivityContext, private readonly string instanceId; private readonly MessagePayloadDataConverter messageDataConverter; private readonly bool inputsAreArrays; + private readonly bool rawInput; private JToken parsedJsonInput; private string serializedOutput; @@ -35,6 +36,9 @@ internal DurableActivityContext(DurableTaskExtension config, string instanceId, this.inputsAreArrays = config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.OrchestratorShim || config.PlatformInformationService.GetWorkerRuntimeType() == WorkerRuntimeType.DotNetIsolated; + + // Do not manipulate JSON input when using middleware passthrough. + this.rawInput = config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough; } /// @@ -116,8 +120,7 @@ internal object GetInput(Type destinationType) return null; } - var value = jToken as JValue; - if (value != null) + if (!this.rawInput && jToken is JValue value) { return value.ToObject(destinationType); } @@ -129,7 +132,7 @@ internal object GetInput(Type destinationType) // MessagePayloadDataConverter to throw an exception. This is a workaround for that case. All other // inputs with destination System.String (in-proc: JSON and not JSON; out-of-proc: not-JSON) inputs with // destination System.String should cast to JValues and be handled above.) - if (destinationType.Equals(typeof(string))) + if (this.rawInput) { return serializedValue; } From 392fde47d1269ef3e7d3330b776e4012f3f31f6f Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Mon, 13 Nov 2023 10:35:39 -0800 Subject: [PATCH 7/8] HTTP APIs for .NET Isolated Parity (#2653) This PR adds HTTP API parity (CallHttpAsync()) for .NET Isolated. Changes include: New DurableHttpRequest and DurableHttpResponse types in the Worker project New TaskOrchestrationContext extension method for CallHttpAsync() Additional changes in OutOfProcMiddleware to execute the BuiltIn::HttpActivity activity function by using the existing TaskHttpActivityShim (BuiltIn::HttpActivity is the reserved name to know when a TaskActivity should be an HTTP activity). --- .../OutOfProcMiddleware.cs | 6 ++ .../Constants.cs | 2 + .../HTTP/DurableHttpRequest.cs | 72 ++++++++++++++++ .../HTTP/DurableHttpResponse.cs | 43 ++++++++++ .../HTTP/HttpHeadersConverter.cs | 84 +++++++++++++++++++ .../HTTP/HttpMethodConverter.cs | 33 ++++++++ .../HTTP/HttpRetryOptions.cs | 81 ++++++++++++++++++ ...askOrchestrationContextExtensionMethods.cs | 52 ++++++++++++ .../Worker.Extensions.DurableTask.csproj | 1 - 9 files changed, 373 insertions(+), 1 deletion(-) create mode 100644 src/Worker.Extensions.DurableTask/HTTP/DurableHttpRequest.cs create mode 100644 src/Worker.Extensions.DurableTask/HTTP/DurableHttpResponse.cs create mode 100644 src/Worker.Extensions.DurableTask/HTTP/HttpHeadersConverter.cs create mode 100644 src/Worker.Extensions.DurableTask/HTTP/HttpMethodConverter.cs create mode 100644 src/Worker.Extensions.DurableTask/HTTP/HttpRetryOptions.cs create mode 100644 src/Worker.Extensions.DurableTask/TaskOrchestrationContextExtensionMethods.cs diff --git a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs index ac7040b2f..cf45c2521 100644 --- a/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs +++ b/src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs @@ -419,6 +419,12 @@ public async Task CallActivityAsync(DispatchMiddlewareContext dispatchContext, F throw new InvalidOperationException($"An activity was scheduled but no {nameof(TaskScheduledEvent)} was found!"); } + if (scheduledEvent.Name?.StartsWith("BuiltIn::", StringComparison.OrdinalIgnoreCase) ?? false) + { + await next(); + return; + } + FunctionName functionName = new FunctionName(scheduledEvent.Name); OrchestrationInstance? instance = dispatchContext.GetProperty(); diff --git a/src/Worker.Extensions.DurableTask/Constants.cs b/src/Worker.Extensions.DurableTask/Constants.cs index 6f3c3cd41..e94200bef 100644 --- a/src/Worker.Extensions.DurableTask/Constants.cs +++ b/src/Worker.Extensions.DurableTask/Constants.cs @@ -9,4 +9,6 @@ internal static class Constants public const string IllegalAwaitErrorMessage = "An invalid asynchronous invocation was detected. This can be caused by awaiting non-durable tasks " + "in an orchestrator function's implementation or by middleware that invokes asynchronous code."; + + public const string HttpTaskActivityReservedName = "BuiltIn::HttpActivity"; } \ No newline at end of file diff --git a/src/Worker.Extensions.DurableTask/HTTP/DurableHttpRequest.cs b/src/Worker.Extensions.DurableTask/HTTP/DurableHttpRequest.cs new file mode 100644 index 000000000..333c74cfd --- /dev/null +++ b/src/Worker.Extensions.DurableTask/HTTP/DurableHttpRequest.cs @@ -0,0 +1,72 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Text.Json.Serialization; +using Microsoft.Extensions.Primitives; + +namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Http; + +/// +/// Request used to make an HTTP call through Durable Functions. +/// +public class DurableHttpRequest +{ + /// + /// Initializes a new instance of the class. + /// + public DurableHttpRequest(HttpMethod method, Uri uri) + { + this.Method = method; + this.Uri = uri; + } + + /// + /// HttpMethod used in the HTTP request made by the Durable Function. + /// + [JsonPropertyName("method")] + [JsonConverter(typeof(HttpMethodConverter))] + public HttpMethod Method { get; } + + /// + /// Uri used in the HTTP request made by the Durable Function. + /// + [JsonPropertyName("uri")] + public Uri Uri { get; } + + /// + /// Headers passed with the HTTP request made by the Durable Function. + /// + [JsonPropertyName("headers")] + [JsonConverter(typeof(HttpHeadersConverter))] + public IDictionary? Headers { get; set; } + + /// + /// Content passed with the HTTP request made by the Durable Function. + /// + [JsonPropertyName("content")] + public string? Content { get; set; } + + /// + /// Specifies whether the Durable HTTP APIs should automatically + /// handle the asynchronous HTTP pattern. + /// + [JsonPropertyName("asynchronousPatternEnabled")] + public bool AsynchronousPatternEnabled { get; set; } + + /// + /// Defines retry policy for handling of failures in making the HTTP Request. These could be non-successful HTTP status codes + /// in the response, a timeout in making the HTTP call, or an exception raised from the HTTP Client library. + /// + [JsonPropertyName("retryOptions")] + public HttpRetryOptions? HttpRetryOptions { get; set; } + + /// + /// The total timeout for the original HTTP request and any + /// asynchronous polling. + /// + [JsonPropertyName("timeout")] + public TimeSpan? Timeout { get; set; } +} \ No newline at end of file diff --git a/src/Worker.Extensions.DurableTask/HTTP/DurableHttpResponse.cs b/src/Worker.Extensions.DurableTask/HTTP/DurableHttpResponse.cs new file mode 100644 index 000000000..06875d9e4 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/HTTP/DurableHttpResponse.cs @@ -0,0 +1,43 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Net; +using System.Text.Json.Serialization; +using Microsoft.Extensions.Primitives; + +namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Http; + +/// +/// Response received from the HTTP request made by the Durable Function. +/// +public class DurableHttpResponse +{ + /// + /// Initializes a new instance of the class. + /// + /// HTTP Status code returned from the HTTP call. + public DurableHttpResponse(HttpStatusCode statusCode) + { + this.StatusCode = statusCode; + } + + /// + /// Status code returned from an HTTP request. + /// + [JsonPropertyName("statusCode")] + public HttpStatusCode StatusCode { get; } + + /// + /// Headers in the response from an HTTP request. + /// + [JsonPropertyName("headers")] + [JsonConverter(typeof(HttpHeadersConverter))] + public IDictionary? Headers { get; init; } + + /// + /// Content returned from an HTTP request. + /// + [JsonPropertyName("content")] + public string? Content { get; init; } +} \ No newline at end of file diff --git a/src/Worker.Extensions.DurableTask/HTTP/HttpHeadersConverter.cs b/src/Worker.Extensions.DurableTask/HTTP/HttpHeadersConverter.cs new file mode 100644 index 000000000..d41acef5e --- /dev/null +++ b/src/Worker.Extensions.DurableTask/HTTP/HttpHeadersConverter.cs @@ -0,0 +1,84 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; +using Microsoft.Extensions.Primitives; + +namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Http; + +// StringValues does not deserialize as you would expect, so we need a custom mechanism +// for serializing HTTP header collections +internal class HttpHeadersConverter : JsonConverter> +{ + public override IDictionary Read( + ref Utf8JsonReader reader, + Type objectType, + JsonSerializerOptions options) + { + var headers = new Dictionary(StringComparer.OrdinalIgnoreCase); + + if (reader.TokenType != JsonTokenType.StartObject) + { + return headers; + } + + var valueList = new List(); + while (reader.Read() && reader.TokenType != JsonTokenType.EndObject) + { + string? propertyName = reader.GetString(); + + reader.Read(); + + // Header values can be either individual strings or string arrays + StringValues values = default(StringValues); + if (reader.TokenType == JsonTokenType.String) + { + values = new StringValues(reader.GetString()); + } + else if (reader.TokenType == JsonTokenType.StartArray) + { + while (reader.Read() && reader.TokenType != JsonTokenType.EndArray) + { + valueList.Add(reader.GetString()); + } + + values = new StringValues(valueList.ToArray()); + valueList.Clear(); + } + + headers[propertyName] = values; + } + + return headers; + } + + public override void Write( + Utf8JsonWriter writer, + IDictionary value, + JsonSerializerOptions options) + { + writer.WriteStartObject(); + + var headers = (IDictionary)value; + foreach (var pair in headers) + { + if (pair.Value.Count == 1) + { + // serialize as a single string value + writer.WriteString(pair.Key, pair.Value[0]); + } + else + { + // serializes as an array + writer.WriteStartArray(pair.Key); + writer.WriteStringValue(pair.Value); + writer.WriteEndArray(); + } + } + + writer.WriteEndObject(); + } +} \ No newline at end of file diff --git a/src/Worker.Extensions.DurableTask/HTTP/HttpMethodConverter.cs b/src/Worker.Extensions.DurableTask/HTTP/HttpMethodConverter.cs new file mode 100644 index 000000000..540f1f981 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/HTTP/HttpMethodConverter.cs @@ -0,0 +1,33 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Net.Http; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Http; + +internal class HttpMethodConverter : JsonConverter +{ + public override bool CanConvert(Type objectType) + { + return typeof(HttpMethod).IsAssignableFrom(objectType); + } + + public override HttpMethod Read( + ref Utf8JsonReader reader, + Type objectType, + JsonSerializerOptions options) + { + return new HttpMethod(reader.GetString()); + } + + public override void Write( + Utf8JsonWriter writer, + HttpMethod value, + JsonSerializerOptions options) + { + writer.WriteStringValue(value.ToString()); + } +} diff --git a/src/Worker.Extensions.DurableTask/HTTP/HttpRetryOptions.cs b/src/Worker.Extensions.DurableTask/HTTP/HttpRetryOptions.cs new file mode 100644 index 000000000..959f7e047 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/HTTP/HttpRetryOptions.cs @@ -0,0 +1,81 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Net; +using System.Text.Json.Serialization; + +namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Http; + +/// +/// Defines retry policies that can be passed as parameters to various operations. +/// +public class HttpRetryOptions +{ + // Would like to make this durability provider specific, but since this is a developer + // facing type, that is difficult. + private static readonly TimeSpan DefaultMaxRetryinterval = TimeSpan.FromDays(6); + + /// + /// Creates a new instance SerializableRetryOptions with the supplied first retry and max attempts. + /// + public HttpRetryOptions(IList? statusCodesToRetry = null) + { + this.StatusCodesToRetry = statusCodesToRetry ?? new List(); + } + + /// + /// Gets or sets the first retry interval. + /// + /// + /// The TimeSpan to wait for the first retries. + /// + [JsonPropertyName("FirstRetryInterval")] + public TimeSpan FirstRetryInterval { get; set; } + + /// + /// Gets or sets the max retry interval. + /// + /// + /// The TimeSpan of the max retry interval, defaults to 6 days. + /// + [JsonPropertyName("MaxRetryInterval")] + public TimeSpan MaxRetryInterval { get; set; } = DefaultMaxRetryinterval; + + /// + /// Gets or sets the backoff coefficient. + /// + /// + /// The backoff coefficient used to determine rate of increase of backoff. Defaults to 1. + /// + [JsonPropertyName("BackoffCoefficient")] + public double BackoffCoefficient { get; set; } = 1; + + /// + /// Gets or sets the timeout for retries. + /// + /// + /// The TimeSpan timeout for retries, defaults to . + /// + [JsonPropertyName("RetryTimeout")] + public TimeSpan RetryTimeout { get; set; } = TimeSpan.MaxValue; + + /// + /// Gets or sets the max number of attempts. + /// + /// + /// The maximum number of retry attempts. + /// + [JsonPropertyName("MaxNumberOfAttempts")] + public int MaxNumberOfAttempts { get; set; } + + /// + /// Gets or sets the list of status codes upon which the + /// retry logic specified by this object shall be triggered. + /// If none are provided, all 4xx and 5xx status codes + /// will be retried. + /// + [JsonPropertyName("StatusCodesToRetry")] + public IList StatusCodesToRetry { get; } +} diff --git a/src/Worker.Extensions.DurableTask/TaskOrchestrationContextExtensionMethods.cs b/src/Worker.Extensions.DurableTask/TaskOrchestrationContextExtensionMethods.cs new file mode 100644 index 000000000..c5523d002 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/TaskOrchestrationContextExtensionMethods.cs @@ -0,0 +1,52 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Net.Http; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask.Http; + +namespace Microsoft.DurableTask; + +/// +/// Extensions for . +/// +public static class TaskOrchestrationContextExtensionMethods +{ + /// + /// Makes an HTTP call using the information in the DurableHttpRequest. + /// + /// The task orchestration context. + /// The DurableHttpRequest used to make the HTTP call. + /// DurableHttpResponse + public static Task CallHttpAsync(this TaskOrchestrationContext context, DurableHttpRequest request) + { + if (context is null) + { + throw new ArgumentNullException(nameof(context)); + } + + return context.CallActivityAsync(Constants.HttpTaskActivityReservedName, request); + } + + /// + /// Makes an HTTP call to the specified uri. + /// + /// The task orchestration context. + /// HttpMethod used for api call. + /// uri used to make the HTTP call. + /// Content passed in the HTTP request. + /// The retry option for the HTTP task. + /// A Result of the HTTP call. + public static Task CallHttpAsync(this TaskOrchestrationContext context, HttpMethod method, Uri uri, string? content = null, HttpRetryOptions? retryOptions = null) + { + DurableHttpRequest request = new DurableHttpRequest(method, uri) + { + Content = content, + HttpRetryOptions = retryOptions, + }; + + return context.CallHttpAsync(request); + } +} diff --git a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj index 3f63320c5..c7b7d1918 100644 --- a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj +++ b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj @@ -54,5 +54,4 @@ content/SBOM - From 6c104220a797007991a8079ca552aecdc7017ce8 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 13 Nov 2023 11:46:59 -0800 Subject: [PATCH 8/8] Remove preview suffixes (#2666) Updated dependencies for v2.13.0 release --- .../WebJobs.Extensions.DurableTask.csproj | 8 ++++---- src/Worker.Extensions.DurableTask/AssemblyInfo.cs | 2 +- .../Worker.Extensions.DurableTask.csproj | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index f3ebdb994..b8c080a39 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -7,7 +7,7 @@ 2 13 0 - $(MajorVersion).$(MinorVersion).$(PatchVersion)-preview.1 + $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 Microsoft Corporation @@ -99,14 +99,14 @@ - + - - + + diff --git a/src/Worker.Extensions.DurableTask/AssemblyInfo.cs b/src/Worker.Extensions.DurableTask/AssemblyInfo.cs index ecf1d0ed5..221171b49 100644 --- a/src/Worker.Extensions.DurableTask/AssemblyInfo.cs +++ b/src/Worker.Extensions.DurableTask/AssemblyInfo.cs @@ -4,4 +4,4 @@ using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; // TODO: Find a way to generate this dynamically at build-time -[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "2.13.0-preview.1")] +[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "2.13.0")] diff --git a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj index c7b7d1918..bdb53e58d 100644 --- a/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj +++ b/src/Worker.Extensions.DurableTask/Worker.Extensions.DurableTask.csproj @@ -39,8 +39,8 @@ - - + +