Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-add ScheduledStartTime to orchestration scheduling in .NET isolated #2805

Merged
merged 6 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,26 @@ public override Task<Empty> Hello(Empty request, ServerCallContext context)
{
try
{
string instanceId = await this.GetClient(context).StartNewAsync(
request.Name, request.InstanceId, Raw(request.Input));
string instanceId = request.InstanceId ?? Guid.NewGuid().ToString("N");
TaskHubClient taskhubClient = new TaskHubClient(this.GetDurabilityProvider(context));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to also provider ILoggerFactory to the ctor

OrchestrationInstance instance;

// TODO: Ideally, we'd have a single method in the taskhubClient that can handle both scheduled and non-scheduled starts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was addressed in the durable-dotnet DurableTaskClient abstraction.

// TODO: the type of `ScheduledStartTimestamp` is not nullable. Can we change it to `DateTime?` in the proto file?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe proto uses its own type: Timestamp, which is an object, so it can be null (whereas DateTime and DateTimeOffset are structs). So no, we cannot change it to DateTime?.

if (request.ScheduledStartTimestamp != null)
{
instance = await taskhubClient.CreateScheduledOrchestrationInstanceAsync(
name: request.Name, version: request.Version, instanceId: instanceId, input: Raw(request.Input), startAt: request.ScheduledStartTimestamp.ToDateTime());
}
else
{
instance = await taskhubClient.CreateOrchestrationInstanceAsync(request.Name, request.Version, instanceId, Raw(request.Input));
}

// TODO: should this not include the ExecutionId and other elements of the taskhubClient response?
return new P.CreateInstanceResponse
{
InstanceId = instanceId,
InstanceId = instance.InstanceId,
};
}
catch (OrchestrationAlreadyExistsException)
Expand Down Expand Up @@ -231,13 +246,13 @@ public override Task<Empty> Hello(Empty request, ServerCallContext context)
EntityBackendQueries.EntityQueryResult result = await entityOrchestrationService.EntityBackendQueries!.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = query.InstanceIdStartsWith,
LastModifiedFrom = query.LastModifiedFrom?.ToDateTime(),
LastModifiedTo = query.LastModifiedTo?.ToDateTime(),
IncludeTransient = query.IncludeTransient,
IncludeState = query.IncludeState,
ContinuationToken = query.ContinuationToken,
PageSize = query.PageSize,
InstanceIdStartsWith = query.InstanceIdStartsWith,
LastModifiedFrom = query.LastModifiedFrom?.ToDateTime(),
LastModifiedTo = query.LastModifiedTo?.ToDateTime(),
IncludeTransient = query.IncludeTransient,
IncludeState = query.IncludeState,
ContinuationToken = query.ContinuationToken,
PageSize = query.PageSize,
Comment on lines -234 to +255
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ran a formatter and it fixed this.

},
context.CancellationToken);

Expand Down
22 changes: 22 additions & 0 deletions test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,27 @@ public static async Task<HttpResponseData> HttpStart(
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

[Function("HelloCities_HttpStart_Scheduled")]
public static async Task<HttpResponseData> HttpStartScheduled(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext,
DateTime scheduledStartTime)
{
ILogger logger = executionContext.GetLogger("HelloCities_HttpStart");

var startOptions = new StartOrchestrationOptions(StartAt: scheduledStartTime);

// Function input comes from the request content.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(HelloCities), startOptions);

logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

// Returns an HTTP 202 response with an instance management payload.
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
return await client.CreateCheckStatusResponseAsync(req, instanceId);
}
}
}
2 changes: 1 addition & 1 deletion test/e2e/Tests/E2ETests.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
Expand Down
59 changes: 59 additions & 0 deletions test/e2e/Tests/Helpers/DurableHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Text.Json.Nodes;

namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E;

internal class DurableHelpers
{
static readonly HttpClient _httpClient = new HttpClient();

internal class OrchestrationStatusDetails
{
public string RuntimeStatus { get; set; } = string.Empty;
public string Input { get; set; } = string.Empty;
public string Output { get; set; } = string.Empty;
public DateTime CreatedTime { get; set; }
public DateTime LastUpdatedTime { get; set; }
public OrchestrationStatusDetails(string statusQueryResponse)
{
JsonNode? statusQueryJsonNode = JsonNode.Parse(statusQueryResponse);
if (statusQueryJsonNode == null)
{
return;
}
this.RuntimeStatus = statusQueryJsonNode["runtimeStatus"]?.GetValue<string>() ?? string.Empty;
this.Input = statusQueryJsonNode["input"]?.ToString() ?? string.Empty;
this.Output = statusQueryJsonNode["output"]?.ToString() ?? string.Empty;
this.CreatedTime = DateTime.Parse(statusQueryJsonNode["createdTime"]?.GetValue<string>() ?? string.Empty).ToUniversalTime();
this.LastUpdatedTime = DateTime.Parse(statusQueryJsonNode["lastUpdatedTime"]?.GetValue<string>() ?? string.Empty).ToUniversalTime();
}
}

internal static string ParseStatusQueryGetUri(HttpResponseMessage invocationStartResponse)
{
string? responseString = invocationStartResponse.Content?.ReadAsStringAsync().Result;

if (string.IsNullOrEmpty(responseString))
{
return string.Empty;
}
JsonNode? responseJsonNode = JsonNode.Parse(responseString);
if (responseJsonNode == null)
{
return string.Empty;
}

string? statusQueryGetUri = responseJsonNode["StatusQueryGetUri"]?.GetValue<string>();
return statusQueryGetUri ?? string.Empty;
}
internal static OrchestrationStatusDetails GetRunningOrchestrationDetails(string statusQueryGetUri)
{
var statusQueryResponse = _httpClient.GetAsync(statusQueryGetUri);

string? statusQueryResponseString = statusQueryResponse.Result.Content.ReadAsStringAsync().Result;

return new OrchestrationStatusDetails(statusQueryResponseString);
}
}
49 changes: 43 additions & 6 deletions test/e2e/Tests/Tests/HelloCitiesTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,66 @@

namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E;

[Collection(Constants.FunctionAppCollectionName )]
[Collection(Constants.FunctionAppCollectionName)]
public class HttpEndToEndTests
{
private readonly FunctionAppFixture _fixture;
private readonly ITestOutputHelper _output;

public HttpEndToEndTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper)
{
_fixture = fixture;
_fixture.TestLogs.UseTestLogger(testOutputHelper);
_output = testOutputHelper;
}

[Theory]
[InlineData("HelloCities_HttpStart", "", HttpStatusCode.Accepted, "")]
public async Task HttpTriggerTests(string functionName, string queryString, HttpStatusCode expectedStatusCode, string expectedMessage)
[InlineData("HelloCities_HttpStart", HttpStatusCode.Accepted, "Hello Tokyo!")]
public async Task HttpTriggerTests(string functionName, HttpStatusCode expectedStatusCode, string partialExpectedOutput)
{
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, queryString);
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, "");
string actualMessage = await response.Content.ReadAsStringAsync();

Assert.Equal(expectedStatusCode, response.StatusCode);
string statusQueryGetUri = DurableHelpers.ParseStatusQueryGetUri(response);
Thread.Sleep(1000);
var orchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
Assert.Equal("Completed", orchestrationDetails.RuntimeStatus);
Assert.Contains(partialExpectedOutput, orchestrationDetails.Output);
}

[Theory]
[InlineData("HelloCities_HttpStart_Scheduled", 5, HttpStatusCode.Accepted)]
[InlineData("HelloCities_HttpStart_Scheduled", -5, HttpStatusCode.Accepted)]
public async Task ScheduledStartTests(string functionName, int startDelaySeconds, HttpStatusCode expectedStatusCode)
{
var testStartTime = DateTime.UtcNow;
var scheduledStartTime = testStartTime + TimeSpan.FromSeconds(startDelaySeconds);
string urlQueryString = $"?ScheduledStartTime={scheduledStartTime.ToString("o")}";

using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, urlQueryString);
string actualMessage = await response.Content.ReadAsStringAsync();

if (!string.IsNullOrEmpty(expectedMessage))
string statusQueryGetUri = DurableHelpers.ParseStatusQueryGetUri(response);

Assert.Equal(expectedStatusCode, response.StatusCode);

var orchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
while (DateTime.UtcNow < scheduledStartTime)
{
Assert.False(string.IsNullOrEmpty(actualMessage));
_output.WriteLine($"Test scheduled for {scheduledStartTime}, current time {DateTime.Now}");
orchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
Assert.Equal("Pending", orchestrationDetails.RuntimeStatus);
Thread.Sleep(3000);
}

// Give a small amount of time for the orchestration to complete, even if scheduled to run immediately
Thread.Sleep(1000);
_output.WriteLine($"Test scheduled for {scheduledStartTime}, current time {DateTime.Now}, looking for completed");

var finalOrchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
Assert.Equal("Completed", finalOrchestrationDetails.RuntimeStatus);

Assert.True(finalOrchestrationDetails.LastUpdatedTime > scheduledStartTime);
}
}
Loading