Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CreateHttpManagementPayload API in Durable Worker Extension #2929

Merged
merged 15 commits into from
Oct 23, 2024
Merged
9 changes: 9 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public string DurableOrchestrationClientToString(IDurableOrchestrationClient cli
ConnectionName = attr.ConnectionName,
RpcBaseUrl = localRpcAddress,
RequiredQueryStringParameters = this.config.HttpApiHandler.GetUniversalQueryStrings(),
HttpBaseUrl = this.config.HttpApiHandler.GetBaseUrl(),
});
}

Expand Down Expand Up @@ -130,6 +131,14 @@ private class OrchestrationClientInputData
/// </summary>
[JsonProperty("rpcBaseUrl")]
public string? RpcBaseUrl { get; set; }

/// <summary>
/// The base URL of the Azure Functions host, used in the out-of-proc model.
/// This URL is sent by the client binding object to the Durable Worker extension,
/// allowing the extension to know the host's base URL for constructing management URLs.
/// </summary>
[JsonProperty("httpBaseUrl")]
public string? HttpBaseUrl { get; set; }
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
}

DurableTaskClient client = this.clientProvider.GetClient(endpoint, inputData?.taskHubName, inputData?.connectionName);
client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters);
client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters, inputData!.httpBaseUrl);
return new ValueTask<ConversionResult>(ConversionResult.Success(client));
}
catch (Exception innerException)
Expand All @@ -62,5 +62,5 @@ public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
}

// Serializer is case-sensitive and incoming JSON properties are camel-cased.
private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters);
private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters, string httpBaseUrl);
}
79 changes: 65 additions & 14 deletions src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,30 @@ public static HttpResponseData CreateCheckStatusResponse(
return response;
}

private static object SetHeadersAndGetPayload(
DurableTaskClient client, HttpRequestData request, HttpResponseData response, string instanceId)
/// <summary>
/// Creates an HTTP management payload for the specified orchestration instance.
/// </summary>
/// <param name="client">The <see cref="DurableTaskClient"/>.</param>
/// <param name="instanceId">The ID of the orchestration instance.</param>
/// <param name="request">Optional HTTP request data to use for creating the base URL.</param>
/// <returns>An object containing instance control URLs.</returns>
/// <exception cref="ArgumentException">Thrown when instanceId is null or empty.</exception>
/// <exception cref="InvalidOperationException">Thrown when a valid base URL cannot be determined.</exception>
public static HttpManagementPayload CreateHttpManagementPayload(
this DurableTaskClient client,
string instanceId,
HttpRequestData? request = null)
{
if (string.IsNullOrEmpty(instanceId))
{
throw new ArgumentException("InstanceId cannot be null or empty.", nameof(instanceId));
}

return SetHeadersAndGetPayload(client, request, null, instanceId);
}

private static HttpManagementPayload SetHeadersAndGetPayload(
DurableTaskClient client, HttpRequestData? request, HttpResponseData? response, string instanceId)
{
static string BuildUrl(string url, params string?[] queryValues)
{
Expand All @@ -143,22 +165,46 @@ static string BuildUrl(string url, params string?[] queryValues)
// request headers into consideration and generate the base URL accordingly.
// More info: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Forwarded.
// One potential workaround is to set ASPNETCORE_FORWARDEDHEADERS_ENABLED to true.
string baseUrl = request.Url.GetLeftPart(UriPartial.Authority);

// If HttpRequestData is provided, use its URL; otherwise, get the baseUrl from the DurableTaskClient.
// The base URL could be null if:
// 1. The DurableTaskClient isn't a FunctionsDurableTaskClient (which would have the baseUrl from bindings)
// 2. There's no valid HttpRequestData provided
string? baseUrl = ((request != null) ? request.Url.GetLeftPart(UriPartial.Authority) : GetBaseUrl(client));

if (baseUrl == null)
{
throw new InvalidOperationException("Failed to create HTTP management payload as base URL is null. Either use Functions bindings or provide an HTTP request to create the HttpPayload.");
}

bool isFromRequest = request != null;

string formattedInstanceId = Uri.EscapeDataString(instanceId);
string instanceUrl = $"{baseUrl}/runtime/webhooks/durabletask/instances/{formattedInstanceId}";

// The baseUrl differs depending on the source. Eg:
// - From request: http://localhost:7071/
nytian marked this conversation as resolved.
Show resolved Hide resolved
// - From durable client: http://localhost:7071/runtime/webhooks/durabletask
// We adjust the instanceUrl construction accordingly.
string instanceUrl = isFromRequest
? $"{baseUrl}/runtime/webhooks/durabletask/instances/{formattedInstanceId}"
: $"{baseUrl}/instances/{formattedInstanceId}";
string? commonQueryParameters = GetQueryParams(client);
response.Headers.Add("Location", BuildUrl(instanceUrl, commonQueryParameters));
response.Headers.Add("Content-Type", "application/json");

if (response != null)
{
response.Headers.Add("Location", BuildUrl(instanceUrl, commonQueryParameters));
response.Headers.Add("Content-Type", "application/json");
}
Comment on lines -150 to +197
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is response possibly null now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetHeadersAndGetPayload is also invoked by the existing method CreateCheckStatusResponse, which provides an HTTP response to SetHeadersAndGetPayload. In the new API, CreateHttpManagementPayload, there won’t be an HTTP response parameter. Therefore, when SetHeadersAndGetPayload is called by CreateHttpManagementPayload, the response will be null; however, if it’s called by CreateCheckStatusResponse, the response will not be null.


return new
return new HttpManagementPayload
{
id = instanceId,
purgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters),
sendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters),
statusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters),
terminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}", commonQueryParameters),
suspendPostUri = BuildUrl($"{instanceUrl}/suspend", "reason={{text}}", commonQueryParameters),
resumePostUri = BuildUrl($"{instanceUrl}/resume", "reason={{text}}", commonQueryParameters)
Id = instanceId,
PurgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters),
SendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters),
StatusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters),
TerminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}", commonQueryParameters),
SuspendPostUri = BuildUrl($"{instanceUrl}/suspend", "reason={{text}}", commonQueryParameters),
ResumePostUri = BuildUrl($"{instanceUrl}/resume", "reason={{text}}", commonQueryParameters)
};
}

Expand All @@ -172,4 +218,9 @@ private static ObjectSerializer GetObjectSerializer(HttpResponseData response)
{
return client is FunctionsDurableTaskClient functions ? functions.QueryString : null;
}

private static string? GetBaseUrl(DurableTaskClient client)
{
return client is FunctionsDurableTaskClient functions ? functions.HttpBaseUrl : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ internal sealed class FunctionsDurableTaskClient : DurableTaskClient
{
private readonly DurableTaskClient inner;

public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString)
public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString, string? httpBaseUrl)
: base(inner.Name)
{
this.inner = inner;
this.QueryString = queryString;
this.HttpBaseUrl = httpBaseUrl;
}

public string? QueryString { get; }

public string? HttpBaseUrl { get; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it allowed for this string to ever be null? If not, I'd recommend removing the ? from the string type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the DurableTaskClient isn’t an instance of FunctionsDurableTaskClient, then HttpBaseUrl could be null, as it’s only an attribute of FunctionsDurableTaskClient. I’m not entirely sure in what scenario the DurableTaskClient wouldn’t be a FunctionsDurableTaskClient—perhaps it could happen if bindings aren’t being used. That’s why I added an exception for cases where the base URL creation fails.

public override DurableEntityClient Entities => this.inner.Entities;

public override ValueTask DisposeAsync()
Expand Down
97 changes: 97 additions & 0 deletions src/Worker.Extensions.DurableTask/HttpManagementPayload.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// This is a copy of: https://github.com/Azure/azure-functions-durable-extension/blob/dev/src/WebJobs.Extensions.DurableTask/HttpManagementPayload.cs

nytian marked this conversation as resolved.
Show resolved Hide resolved
using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;

namespace Microsoft.Azure.Functions.Worker;
nytian marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Data structure containing status, terminate and send external event HTTP endpoints.
/// </summary>
public class HttpManagementPayload
{
/// <summary>
/// Gets the ID of the orchestration instance.
/// </summary>
/// <value>
/// The ID of the orchestration instance.
/// </value>
[JsonProperty("id")]
public string? Id { get; internal set; }

/// <summary>
/// Gets the HTTP GET status query endpoint URL.
/// </summary>
/// <value>
/// The HTTP URL for fetching the instance status.
/// </value>
[JsonProperty("statusQueryGetUri")]
public string? StatusQueryGetUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST external event sending endpoint URL.
/// </summary>
/// <value>
/// The HTTP URL for posting external event notifications.
/// </value>
[JsonProperty("sendEventPostUri")]
public string? SendEventPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance termination endpoint.
/// </summary>
/// <value>
/// The HTTP URL for posting instance termination commands.
/// </value>
[JsonProperty("terminatePostUri")]
public string? TerminatePostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance rewind endpoint.
/// </summary>
/// <value>
/// The HTTP URL for rewinding orchestration instances.
/// </value>
[JsonProperty("rewindPostUri")]
public string? RewindPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP DELETE purge instance history by instance ID endpoint.
/// </summary>
/// <value>
/// The HTTP URL for purging instance history by instance ID.
/// </value>
[JsonProperty("purgeHistoryDeleteUri")]
public string? PurgeHistoryDeleteUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance restart endpoint.
/// </summary>
/// <value>
/// The HTTP URL for restarting an orchestration instance.
/// </value>
[JsonProperty("restartPostUri")]
public string? RestartPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance suspend endpoint.
/// </summary>
/// <value>
/// The HTTP URL for suspending an orchestration instance.
/// </value>
[JsonProperty("suspendPostUri")]
public string? SuspendPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance resume endpoint.
/// </summary>
/// <value>
/// The HTTP URL for resuming an orchestration instance.
/// </value>
[JsonProperty("resumePostUri")]
public string? ResumePostUri { get; internal set; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Grpc;
using Moq;

namespace Microsoft.Azure.Functions.Worker.Tests
Expand All @@ -9,7 +9,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
/// </summary>
public class FunctionsDurableTaskClientTests
{
private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient()
private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient(string? baseUrl = null)
{
// construct mock client

Expand All @@ -22,7 +22,7 @@ private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient()
It.IsAny<string>(), It.IsAny<TerminateInstanceOptions>(), It.IsAny<CancellationToken>())).Returns(completedTask);

DurableTaskClient durableClient = durableClientMock.Object;
FunctionsDurableTaskClient client = new FunctionsDurableTaskClient(durableClient, queryString: null);
FunctionsDurableTaskClient client = new FunctionsDurableTaskClient(durableClient, queryString: null, httpBaseUrl: baseUrl);
return client;
}

Expand Down Expand Up @@ -53,5 +53,51 @@ public async void TerminateDoesNotThrow()
await client.TerminateInstanceAsync(instanceId, options);
await client.TerminateInstanceAsync(instanceId, options, token);
}

/// <summary>
/// Test that the `CreateHttpManagementPayload` method returns the expected payload structure without HttpRequestData.
/// </summary>
[Fact]
public void CreateHttpManagementPayload_WithBaseUrl()
{
const string BaseUrl = "http://localhost:7071/runtime/webhooks/durabletask";
FunctionsDurableTaskClient client = this.GetTestFunctionsDurableTaskClient(BaseUrl);
string instanceId = "testInstanceIdWithHostBaseUrl";

HttpManagementPayload payload = client.CreateHttpManagementPayload(instanceId);

AssertHttpManagementPayload(payload, BaseUrl, instanceId);
}

/// <summary>
/// Test that the `CreateHttpManagementPayload` method returns the expected payload structure with HttpRequestData.
/// </summary>
[Fact]
public void CreateHttpManagementPayload_WithHttpRequestData()
{
const string requestUrl = "http://localhost:7075/orchestrators/E1_HelloSequence";
FunctionsDurableTaskClient client = this.GetTestFunctionsDurableTaskClient();
string instanceId = "testInstanceIdWithRequest";

// Create mock HttpRequestData object.
var mockFunctionContext = new Mock<FunctionContext>();
var mockHttpRequestData = new Mock<HttpRequestData>(mockFunctionContext.Object);
mockHttpRequestData.SetupGet(r => r.Url).Returns(new Uri(requestUrl));

HttpManagementPayload payload = client.CreateHttpManagementPayload(instanceId, mockHttpRequestData.Object);

AssertHttpManagementPayload(payload, "http://localhost:7075/runtime/webhooks/durabletask", instanceId);
}

private static void AssertHttpManagementPayload(HttpManagementPayload payload, string BaseUrl, string instanceId)
{
Assert.Equal(instanceId, payload.Id);
Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.PurgeHistoryDeleteUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/raiseEvent/{{eventName}}", payload.SendEventPostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.StatusQueryGetUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/terminate?reason={{{{text}}}}", payload.TerminatePostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/suspend?reason={{{{text}}}}", payload.SuspendPostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/resume?reason={{{{text}}}}", payload.ResumePostUri);
}
}
}
}
Loading