Skip to content

Commit

Permalink
Update the job hosting to run the task count per queue type (#3997)
Browse files Browse the repository at this point in the history
* Updating the job hosting to run the max running task count per queue type.

* Adding a new parameter for the running job count to JobHosting.ExecuteAsync.

* Adding a null check on _hostingConfiguration.
  • Loading branch information
v-iyamauchi authored Aug 1, 2024
1 parent 2214c77 commit c971d72
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
if (_hostingConfiguration != null)
{
jobHostingValue.PollingFrequencyInSeconds = _hostingConfiguration.PollingFrequencyInSeconds ?? jobHostingValue.PollingFrequencyInSeconds;
jobHostingValue.MaxRunningJobCount = _hostingConfiguration.MaxRunningTaskCount ?? jobHostingValue.MaxRunningJobCount;
jobHostingValue.JobHeartbeatIntervalInSeconds = _hostingConfiguration.TaskHeartbeatIntervalInSeconds ?? jobHostingValue.JobHeartbeatIntervalInSeconds;
jobHostingValue.JobHeartbeatTimeoutThresholdInSeconds = _hostingConfiguration.TaskHeartbeatTimeoutThresholdInSeconds ?? jobHostingValue.JobHeartbeatTimeoutThresholdInSeconds;
}
Expand All @@ -74,7 +73,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

foreach (var operation in _operationsConfiguration.HostingBackgroundServiceQueues)
{
jobQueues.Add(jobHostingValue.ExecuteAsync((byte)operation.Queue, Environment.MachineName, cancellationTokenSource));
short runningJobCount = operation.MaxRunningTaskCount ?? _hostingConfiguration?.MaxRunningTaskCount ?? Constants.DefaultMaxRunningJobCount;
jobQueues.Add(jobHostingValue.ExecuteAsync((byte)operation.Queue, runningJobCount, Environment.MachineName, cancellationTokenSource));
}

await Task.WhenAll(jobQueues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ namespace Microsoft.Health.Fhir.Core.Configs;

public class HostingBackgroundServiceQueueItem
{
/// <summary>
/// Gets or sets the queue type.
/// </summary>
public QueueType Queue { get; set; }

/// <summary>
/// Gets or sets the max running task count at the same time for this queue type.
/// </summary>
public short? MaxRunningTaskCount { get; set; }

// TODO: This is not honored. Make sure that it is not used in PaaS and remove.
public bool UpdateProgressOnHeartbeat { get; set; }
}
24 changes: 8 additions & 16 deletions src/Microsoft.Health.TaskManagement.UnitTests/JobHostingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ public async Task GivenValidJobs_WhenJobHostingStart_ThenJobsShouldBeExecute()

JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.MaxRunningJobCount = 5;

CancellationTokenSource tokenSource = new CancellationTokenSource();

tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
await jobHosting.ExecuteAsync(0, "test", tokenSource);
await jobHosting.ExecuteAsync(0, 5, "test", tokenSource);

Assert.Equal(jobCount, executedJobCount);
foreach (JobInfo job in jobs)
Expand Down Expand Up @@ -121,12 +120,11 @@ public async Task GivenJobWithCriticalException_WhenJobHostingStart_ThenJobShoul

JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.MaxRunningJobCount = 1;

CancellationTokenSource tokenSource = new CancellationTokenSource();

tokenSource.CancelAfter(TimeSpan.FromSeconds(1));
await jobHosting.ExecuteAsync(0, "test", tokenSource);
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);

Assert.Equal(2, executeCount);

Expand Down Expand Up @@ -163,13 +161,12 @@ public async Task GivenAnCrashJob_WhenJobHostingStart_ThenJobShouldBeRePickup()

JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.MaxRunningJobCount = 1;
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;

CancellationTokenSource tokenSource = new CancellationTokenSource();

tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
await jobHosting.ExecuteAsync(0, "test", tokenSource);
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);

Assert.Equal(JobStatus.Completed, job1.Status);
Assert.Equal(1, executeCount0);
Expand Down Expand Up @@ -198,12 +195,11 @@ public async Task GivenAnLongRunningJob_WhenJobHostingStop_ThenJobShouldBeComple
JobInfo job1 = (await queueClient.EnqueueAsync(0, new string[] { "job1" }, null, false, false, CancellationToken.None)).First();
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.MaxRunningJobCount = 1;
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;

CancellationTokenSource tokenSource = new CancellationTokenSource();

Task hostingTask = jobHosting.ExecuteAsync(0, "test", tokenSource);
Task hostingTask = jobHosting.ExecuteAsync(0, 1, "test", tokenSource);
autoResetEvent.WaitOne();
tokenSource.Cancel();

Expand Down Expand Up @@ -238,13 +234,12 @@ public async Task GivenJobWithInvalidOperationException_WhenJobHostingStart_Then

JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.MaxRunningJobCount = 1;
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;

CancellationTokenSource tokenSource = new CancellationTokenSource();

tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
await jobHosting.ExecuteAsync(0, "test", tokenSource);
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);

Assert.Equal(JobStatus.Failed, job1.Status);
Assert.Equal(1, executeCount0);
Expand Down Expand Up @@ -276,12 +271,11 @@ public async Task GivenJobWithCanceledException_WhenJobHostingStart_ThenJobShoul

JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.MaxRunningJobCount = 1;
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 15;

CancellationTokenSource tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
await jobHosting.ExecuteAsync(0, "test", tokenSource);
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);

Assert.Equal(JobStatus.Cancelled, job1.Status);
Assert.Equal(1, executeCount0);
Expand Down Expand Up @@ -313,11 +307,10 @@ public async Task GivenJobRunning_WhenCancel_ThenJobShouldBeCancelled()
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.JobHeartbeatIntervalInSeconds = 1;
jobHosting.MaxRunningJobCount = 1;

CancellationTokenSource tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
Task hostingTask = jobHosting.ExecuteAsync(0, "test", tokenSource);
Task hostingTask = jobHosting.ExecuteAsync(0, 1, "test", tokenSource);

autoResetEvent.WaitOne();
await queueClient.CancelJobByGroupIdAsync(0, job1.GroupId, CancellationToken.None);
Expand Down Expand Up @@ -392,13 +385,12 @@ public async Task GivenRandomFailuresInQueueClient_WhenStartHosting_ThenAllTasks

var jobHosting = new JobHosting(queueClient, factory, _logger);
jobHosting.PollingFrequencyInSeconds = 0;
jobHosting.MaxRunningJobCount = 10;
jobHosting.JobHeartbeatIntervalInSeconds = 0.001;
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;

var tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(TimeSpan.FromSeconds(60));
var host = Task.Run(async () => await jobHosting.ExecuteAsync(0, "test", tokenSource));
var host = Task.Run(async () => await jobHosting.ExecuteAsync(0, 10, "test", tokenSource));
while (jobs.Where(t => t.Status == JobStatus.Completed).Count() < numberOfJobs && !tokenSource.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
Expand Down
6 changes: 2 additions & 4 deletions src/Microsoft.Health.TaskManagement/JobHosting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,16 @@ public JobHosting(IQueueClient queueClient, IJobFactory jobFactory, ILogger<JobH

public int PollingFrequencyInSeconds { get; set; } = Constants.DefaultPollingFrequencyInSeconds;

public short MaxRunningJobCount { get; set; } = Constants.DefaultMaxRunningJobCount;

public int JobHeartbeatTimeoutThresholdInSeconds { get; set; } = Constants.DefaultJobHeartbeatTimeoutThresholdInSeconds;

public double JobHeartbeatIntervalInSeconds { get; set; } = Constants.DefaultJobHeartbeatIntervalInSeconds;

public async Task ExecuteAsync(byte queueType, string workerName, CancellationTokenSource cancellationTokenSource)
public async Task ExecuteAsync(byte queueType, short runningJobCount, string workerName, CancellationTokenSource cancellationTokenSource)
{
var workers = new List<Task>();

// parallel dequeue
for (var thread = 0; thread < MaxRunningJobCount; thread++)
for (var thread = 0; thread < runningJobCount; thread++)
{
workers.Add(Task.Run(async () =>
{
Expand Down

0 comments on commit c971d72

Please sign in to comment.