Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reliability for waiting job to start before running steps. #3450

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private int _resultsServiceExceptionsCount = 0;
private Stopwatch _resultsUploadTimer = new();
private Stopwatch _actionsUploadTimer = new();
private Stopwatch _jobRecordUpdatedTimer = new();

public TaskCompletionSource<int> JobRecordUpdated => _jobRecordUpdated;

Expand All @@ -96,6 +97,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue

private bool _resultsClientInitiated = false;
private bool _enableTelemetry = false;
private bool _enableJobRecordUpdatedTelemetry = false;
private bool _enableAutoRetry = false;
private delegate Task ResultsFileUploadHandler(ResultsUploadFileInfo file);

public override void Initialize(IHostContext hostContext)
Expand Down Expand Up @@ -180,6 +183,23 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServi

_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask };
_queueInProcess = true;


if (jobRequest.Variables.TryGetValue("DistributedTask.EnableJobRecordUpdatedTelemetry", out VariableValue enableTelemetry))
{
_enableJobRecordUpdatedTelemetry = StringUtil.ConvertToBoolean(enableTelemetry?.Value);
if (_enableJobRecordUpdatedTelemetry)
{
Trace.Info("Enable telemetry for first job record update.");
_jobRecordUpdatedTimer.Start();
}
}

if (jobRequest.Variables.TryGetValue("DistributedTask.EnableRecordUpdateAutoRetry", out VariableValue enableAutoRetry))
{
Trace.Info("Enable auto retry for timeline record update.");
_enableAutoRetry = StringUtil.ConvertToBoolean(enableAutoRetry?.Value);
}
}

// WebConsoleLine queue and FileUpload queue are always best effort
Expand Down Expand Up @@ -232,6 +252,11 @@ public async Task ShutdownAsync()
Trace.Info(uploadTimeComparison);
_jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = uploadTimeComparison });
}

if (_enableJobRecordUpdatedTelemetry)
{
_jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = $"First job record updated time after: {_jobRecordUpdatedTimer.ElapsedMilliseconds} ms" });
}
}

public void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber)
Expand Down Expand Up @@ -729,6 +754,11 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
// We have changed the state of the job
Trace.Info("Job timeline record has been updated for the first time.");
_jobRecordUpdated.TrySetResult(0);

if (_enableJobRecordUpdatedTelemetry)
{
_jobRecordUpdatedTimer.Stop();
}
}
}
catch (Exception ex)
Expand All @@ -740,6 +770,15 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
mainTimelineRecordsUpdateErrors.Add(ex);
}

if (!runOnce && _enableAutoRetry)
{
foreach (var retryRecordId in update.PendingRecords.DistinctBy(x => x.Id).Select(r => r.Id))
{
Trace.Verbose("Enqueue timeline record {0} update for retry.", retryRecordId);
_timelineUpdateQueue[update.TimelineId].Enqueue(new TimelineRecord() { Id = retryRecordId });
}
}
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
// Server won't issue ID_TOKEN for non-inprogress job.
// If the job is trying to use OIDC feature, we want the job to be marked as in-progress before running any customer's steps as much as we can.
// Timeline record update background process runs every 500ms, so delay 1000ms is enough for most of the cases
Trace.Info($"Waiting for job to be marked as started.");
await Task.WhenAny(_jobServerQueue.JobRecordUpdated.Task, Task.Delay(1000));
var maxWaitTimeInSeconds = jobContext.Global.Variables.GetInt("DistributedTask.FirstJobRecordUpdateWaitTimeInSeconds");
if (maxWaitTimeInSeconds == null)
{
maxWaitTimeInSeconds = 1;
}

Trace.Info($"Waiting {maxWaitTimeInSeconds.Value} seconds for job to be marked as started.");
await Task.WhenAny(_jobServerQueue.JobRecordUpdated.Task, Task.Delay(maxWaitTimeInSeconds.Value * 1000));
}

// Run all job steps
Expand Down
Loading