Skip to content

Commit

Permalink
Improve reliability for waiting job to start before running steps.
Browse files Browse the repository at this point in the history
  • Loading branch information
TingluoHuang committed Sep 4, 2024
1 parent 36c66c8 commit ec9bdfc
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
39 changes: 39 additions & 0 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private int _resultsServiceExceptionsCount = 0;
private Stopwatch _resultsUploadTimer = new();
private Stopwatch _actionsUploadTimer = new();
private Stopwatch _jobRecordUpdatedTimer = new();

public TaskCompletionSource<int> JobRecordUpdated => _jobRecordUpdated;

Expand All @@ -96,6 +97,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue

private bool _resultsClientInitiated = false;
private bool _enableTelemetry = false;
private bool _enableJobRecordUpdatedTelemetry = false;
private bool _enableAutoRetry = false;
private delegate Task ResultsFileUploadHandler(ResultsUploadFileInfo file);

public override void Initialize(IHostContext hostContext)
Expand Down Expand Up @@ -180,6 +183,23 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServi

_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask };
_queueInProcess = true;


if (jobRequest.Variables.TryGetValue("DistributedTask.EnableJobRecordUpdatedTelemetry", out VariableValue enableTelemetry))
{
_enableJobRecordUpdatedTelemetry = StringUtil.ConvertToBoolean(enableTelemetry?.Value);
if (_enableJobRecordUpdatedTelemetry)
{
Trace.Info("Enable telemetry for first job record update.");
_jobRecordUpdatedTimer.Start();
}
}

if (jobRequest.Variables.TryGetValue("DistributedTask.EnableRecordUpdateAutoRetry", out VariableValue enableAutoRetry))
{
Trace.Info("Enable auto retry for timeline record update.");
_enableAutoRetry = StringUtil.ConvertToBoolean(enableAutoRetry?.Value);
}
}

// WebConsoleLine queue and FileUpload queue are always best effort
Expand Down Expand Up @@ -232,6 +252,11 @@ public async Task ShutdownAsync()
Trace.Info(uploadTimeComparison);
_jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = uploadTimeComparison });
}

if (_enableJobRecordUpdatedTelemetry)
{
_jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = $"First job record updated time after: {_jobRecordUpdatedTimer.ElapsedMilliseconds} ms" });
}
}

public void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber)
Expand Down Expand Up @@ -729,6 +754,11 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
// We have changed the state of the job
Trace.Info("Job timeline record has been updated for the first time.");
_jobRecordUpdated.TrySetResult(0);

if (_enableJobRecordUpdatedTelemetry)
{
_jobRecordUpdatedTimer.Stop();
}
}
}
catch (Exception ex)
Expand All @@ -740,6 +770,15 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
mainTimelineRecordsUpdateErrors.Add(ex);
}

if (!runOnce && _enableAutoRetry)
{
foreach (var retryRecordId in update.PendingRecords.DistinctBy(x => x.Id).Select(r => r.Id))
{
Trace.Verbose("Enqueue timeline record {0} update for retry.", retryRecordId);
_timelineUpdateQueue[update.TimelineId].Enqueue(new TimelineRecord() { Id = retryRecordId });
}
}
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
// Server won't issue ID_TOKEN for non-inprogress job.
// If the job is trying to use OIDC feature, we want the job to be marked as in-progress before running any customer's steps as much as we can.
// Timeline record update background process runs every 500ms, so delay 1000ms is enough for most of the cases
Trace.Info($"Waiting for job to be marked as started.");
await Task.WhenAny(_jobServerQueue.JobRecordUpdated.Task, Task.Delay(1000));
var maxWaitTimeInSeconds = jobContext.Global.Variables.GetInt("DistributedTask.FirstJobRecordUpdateWaitTimeInSeconds");
if (maxWaitTimeInSeconds == null)
{
maxWaitTimeInSeconds = 1;
}

Trace.Info($"Waiting {maxWaitTimeInSeconds.Value} seconds for job to be marked as started.");
await Task.WhenAny(_jobServerQueue.JobRecordUpdated.Task, Task.Delay(maxWaitTimeInSeconds.Value * 1000));
}

// Run all job steps
Expand Down

0 comments on commit ec9bdfc

Please sign in to comment.