Skip to content

Commit

Permalink
Refactor event handling and cancellation in EventQueue and ResourceMa…
Browse files Browse the repository at this point in the history
…nager

Refactored the `EventQueue.cs` and `ResourceManager.cs` files to improve event handling and cancellation. The `try-catch` block has been moved inside the `while` loop in `EventQueue.cs` to ensure continuous event processing despite exceptions. The cancellation token check has been moved to the start of the `while` loop for immediate loop exit upon cancellation request. Simplified the `if` condition checking for null or cancelled events by removing nested `if` statements. In `ResourceManager.cs`, a `try-catch` block has been added inside the `async` lambda function to prevent exceptions from stopping the processing of subsequent events. The cancellation token check has been moved inside the `try` block to throw an exception upon cancellation request, allowing it to be caught and handled.
  • Loading branch information
marcusbooyah committed Apr 4, 2024
1 parent 45a2d4a commit bd8ec13
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 93 deletions.
102 changes: 48 additions & 54 deletions src/Neon.Operator/ResourceManager/EventQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,45 +191,42 @@ private async Task ReconcileConsumerAsync(CancellationToken cancellationToken =

using var worker = metrics.ActiveWorkers.TrackInProgress();

try
while (await eventChannel.Reader.WaitToReadAsync(cancellationToken))
{
while (await eventChannel.Reader.WaitToReadAsync(cancellationToken))
cancellationToken.ThrowIfCancellationRequested();

if (eventChannel.Reader.TryRead(out var uid))
{
cancellationToken.ThrowIfCancellationRequested();
var @event = queue.Keys.Where(key => key.Value.Uid() == uid).FirstOrDefault();

if (eventChannel.Reader.TryRead(out var uid))
if (@event == null || @event.Value == null || queue[@event].IsCancellationRequested)
{
var @event = queue.Keys.Where(key => key.Value.Uid() == uid).FirstOrDefault();

if (@event == null || @event.Value == null || queue[@event].IsCancellationRequested)
{
continue;
}
continue;
}

try
{
currentEvents.TryAdd(uid, DateTime.UtcNow);
try
{
currentEvents.TryAdd(uid, DateTime.UtcNow);

metrics.QueueDurationSeconds.Observe((DateTime.UtcNow - @event.CreatedAt).TotalSeconds);
logger?.LogDebugEx(() => $"Executing event [{@event.Type}] for resource [{@event.Value.Kind}/{@event.Value.Name()}]");
metrics.QueueDurationSeconds.Observe((DateTime.UtcNow - @event.CreatedAt).TotalSeconds);
logger?.LogDebugEx(() => $"Executing event [{@event.Type}] for resource [{@event.Value.Kind}/{@event.Value.Name()}]");

using (var timer = metrics.WorkDurationSeconds.NewTimer())
{
await eventHandler?.Invoke(@event);
}
}
finally
using (var timer = metrics.WorkDurationSeconds.NewTimer())
{
currentEvents.Remove(uid, out _);
await eventHandler?.Invoke(@event);
}
}
catch (Exception e)
{
logger?.LogErrorEx(e);
throw;
}
finally
{
currentEvents.Remove(uid, out _);
}
}
}
catch (Exception e)
{
logger?.LogErrorEx(e);
throw;
}

if (cancellationToken.IsCancellationRequested)
{
Expand All @@ -247,45 +244,42 @@ private async Task FinalizerConsumerAsync(CancellationToken cancellationToken =

using var worker = metrics.ActiveWorkers.TrackInProgress();

try
while (await finalizeChannel.Reader.WaitToReadAsync(cancellationToken))
{
while (await finalizeChannel.Reader.WaitToReadAsync(cancellationToken))
cancellationToken.ThrowIfCancellationRequested();

if (finalizeChannel.Reader.TryRead(out var uid))
{
cancellationToken.ThrowIfCancellationRequested();
var @event = queue.Keys.Where(key => key.Value.Uid() == uid).FirstOrDefault();

if (finalizeChannel.Reader.TryRead(out var uid))
if (@event == null || @event.Value == null || queue[@event].IsCancellationRequested)
{
var @event = queue.Keys.Where(key => key.Value.Uid() == uid).FirstOrDefault();

if (@event == null || @event.Value == null || queue[@event].IsCancellationRequested)
{
continue;
}
continue;
}

try
{
currentEvents.TryAdd(uid, DateTime.UtcNow);
try
{
currentEvents.TryAdd(uid, DateTime.UtcNow);

metrics.QueueDurationSeconds.Observe((DateTime.UtcNow - @event.CreatedAt).TotalSeconds);
logger?.LogDebugEx(() => $"Executing event [{@event.Type}] for resource [{@event.Value.Kind}/{@event.Value.Name()}]");
metrics.QueueDurationSeconds.Observe((DateTime.UtcNow - @event.CreatedAt).TotalSeconds);
logger?.LogDebugEx(() => $"Executing event [{@event.Type}] for resource [{@event.Value.Kind}/{@event.Value.Name()}]");

using (var timer = metrics.WorkDurationSeconds.NewTimer())
{
await eventHandler?.Invoke(@event);
}
}
finally
using (var timer = metrics.WorkDurationSeconds.NewTimer())
{
currentEvents.Remove(uid, out _);
await eventHandler?.Invoke(@event);
}
}
catch (Exception e)
{
logger?.LogErrorEx(e);
throw;
}
finally
{
currentEvents.Remove(uid, out _);
}
}
}
catch (Exception e)
{
logger?.LogErrorEx(e);
throw;
}

if (cancellationToken.IsCancellationRequested)
{
Expand Down
86 changes: 47 additions & 39 deletions src/Neon.Operator/ResourceManager/ResourceManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,59 +1039,67 @@ await eventQueue.RequeueAsync(
{
using (var activity = TraceContext.ActivitySource?.StartActivity("EnqueueResourceEvent", ActivityKind.Server))
{
cancellationToken.ThrowIfCancellationRequested();
try
{
cancellationToken.ThrowIfCancellationRequested();
var resource = @event.Value;
var resourceName = resource.Metadata.Name;
var resource = @event.Value;
var resourceName = resource.Metadata.Name;
logger?.LogDebugEx(() => $"Resource {resource.Kind} {resource.Namespace()}/{resource.Name()} received {@event.Type} event.");
logger?.LogDebugEx(() => $"Resource {resource.Kind} {resource.Namespace()}/{resource.Name()} received {@event.Type} event.");
resourceCache.Compare(resource, out var modifiedEventType);
resourceCache.Compare(resource, out var modifiedEventType);
@event.ModifiedEventType = modifiedEventType;
@event.ModifiedEventType = modifiedEventType;
switch (@event.Type)
{
case (k8s.WatchEventType)WatchEventType.Added:
case (k8s.WatchEventType)WatchEventType.Deleted:
case (k8s.WatchEventType)WatchEventType.Modified:
switch (@event.Type)
{
case (k8s.WatchEventType)WatchEventType.Added:
case (k8s.WatchEventType)WatchEventType.Deleted:
case (k8s.WatchEventType)WatchEventType.Modified:
await eventQueue.DequeueAsync(@event, cancellationToken: cancellationToken);
await eventQueue.EnqueueAsync(@event, cancellationToken: cancellationToken);
break;
await eventQueue.DequeueAsync(@event, cancellationToken: cancellationToken);
await eventQueue.EnqueueAsync(@event, cancellationToken: cancellationToken);
break;
case (k8s.WatchEventType)WatchEventType.Bookmark:
case (k8s.WatchEventType)WatchEventType.Bookmark:
break; // We don't care about these.
break; // We don't care about these.
case (k8s.WatchEventType)WatchEventType.Error:
case (k8s.WatchEventType)WatchEventType.Error:
// I believe we're only going to see this for extreme scenarios, like:
//
// 1. The CRD we're watching was deleted and recreated.
// 2. The watcher is so far behind that part of the
// history is no longer available.
//
// We're going to log this and terminate the application, expecting
// that Kubernetes will reschedule it so we can start over.
// I believe we're only going to see this for extreme scenarios, like:
//
// 1. The CRD we're watching was deleted and recreated.
// 2. The watcher is so far behind that part of the
// history is no longer available.
//
// We're going to log this and terminate the application, expecting
// that Kubernetes will reschedule it so we can start over.
var stub = new TEntity();
var stub = new TEntity();
if (!string.IsNullOrEmpty(resource.Namespace()))
{
logger?.LogCriticalEx(() => $"Critical error watching: [namespace={resource.Namespace()}] {stub.ApiGroupAndVersion}/{stub.Kind}");
}
else
{
logger?.LogCriticalEx(() => $"Critical error watching: {stub.ApiGroupAndVersion}/{stub.Kind}");
}
if (!string.IsNullOrEmpty(resource.Namespace()))
{
logger?.LogCriticalEx(() => $"Critical error watching: [namespace={resource.Namespace()}] {stub.ApiGroupAndVersion}/{stub.Kind}");
}
else
{
logger?.LogCriticalEx(() => $"Critical error watching: {stub.ApiGroupAndVersion}/{stub.Kind}");
}
logger?.LogCriticalEx("Terminating the pod so Kubernetes can reschedule it and we can restart the watch.");
Environment.Exit(1);
break;
logger?.LogCriticalEx("Terminating the pod so Kubernetes can reschedule it and we can restart the watch.");
Environment.Exit(1);
break;
default:
break;
default:
break;
}
}
catch (Exception e)
{
logger?.LogErrorEx(e);
throw;
}
}
};
Expand Down

0 comments on commit bd8ec13

Please sign in to comment.