From 27d7edcbf65b1c971032af5a953dfe03af346109 Mon Sep 17 00:00:00 2001 From: Dejan Zele Pejchev Date: Wed, 4 Dec 2024 14:29:57 +0100 Subject: [PATCH] add postprocessing logic for sync tasks which handles job ttl Signed-off-by: Dejan Zele Pejchev --- pkg/health/health.go | 13 ++++- pkg/sync/sync_context.go | 102 +++++++++++++++++++++++++++++++++++++++ pkg/sync/sync_task.go | 2 + 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/pkg/health/health.go b/pkg/health/health.go index b93d8c967..6cb37e879 100644 --- a/pkg/health/health.go +++ b/pkg/health/health.go @@ -64,7 +64,7 @@ func IsWorse(current, new HealthStatusCode) bool { // GetResourceHealth returns the health of a k8s resource func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOverride) (health *HealthStatus, err error) { - if obj.GetDeletionTimestamp() != nil { + if obj.GetDeletionTimestamp() != nil && !hasHookFinalizer(obj) { return &HealthStatus{ Status: HealthStatusProgressing, Message: "Pending deletion", @@ -97,6 +97,17 @@ func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOver } +func hasHookFinalizer(obj *unstructured.Unstructured) bool { + hookFinalizer := "argoproj.io/hook-finalizer" + finalizers := obj.GetFinalizers() + for _, finalizer := range finalizers { + if finalizer == hookFinalizer { + return true + } + } + return false +} + // GetHealthCheckFunc returns built-in health check function or nil if health check is not supported func GetHealthCheckFunc(gvk schema.GroupVersionKind) func(obj *unstructured.Unstructured) (*HealthStatus, error) { switch gvk.Group { diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 35981ebaa..f01b34795 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "k8s.io/client-go/kubernetes" "sort" "strings" "sync" @@ -228,6 +229,10 @@ func NewSyncContext( if err != nil { return nil, nil, err } + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, nil, err + } ctx := &syncContext{ revision: revision, resources: groupResources(reconciliationResult), @@ -236,6 +241,7 @@ func NewSyncContext( rawConfig: rawConfig, dynamicIf: dynamicIf, disco: disco, + clientset: clientset, extensionsclientset: extensionsclientset, kubectl: kubectl, resourceOps: resourceOps, @@ -331,6 +337,7 @@ type syncContext struct { dynamicIf dynamic.Interface disco discovery.DiscoveryInterface extensionsclientset *clientset.Clientset + clientset *kubernetes.Clientset kubectl kube.Kubectl resourceOps kube.ResourceOperations namespace string @@ -476,6 +483,17 @@ func (sc *syncContext) Sync() { return } + hooksCompleted := tasks.Filter(func(task *syncTask) bool { + return task.isHook() && task.completed() + }) + for _, task := range hooksCompleted { + if task.cleanup != nil { + if err := task.cleanup(); err != nil { + sc.log.V(1).Error(err, "failed to run hook task cleanup") + } + } + } + // collect all completed hooks which have appropriate delete policy hooksPendingDeletionSuccessful := tasks.Filter(func(task *syncTask) bool { return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseSuccessful() @@ -688,6 +706,8 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks") + sc.processHookTasks(hookTasks) + tasks := resourceTasks tasks = append(tasks, hookTasks...) @@ -830,6 +850,83 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { return tasks, successful } +// processHookTasks applies additional logic to hook tasks. +func (sc *syncContext) processHookTasks(tasks syncTasks) { + for _, task := range tasks { + // This is a safety check to ensure that we process only currently running hook tasks. + if !task.isHook() || !task.pending() { + continue + } + // Safety check to ensure that the target object is not nil. + if task.targetObj == nil { + continue + } + // Currently, we only process hook tasks where the target object is a Job. + if task.targetObj.GetKind() == "Job" { + sc.processJobHookTask(task) + } + } +} + +// processJobHookTask processes a hook task where the target object is a Job and has defined ttlSecondsAfterFinished. +// This addresses the issue where a Job with a ttlSecondsAfterFinished set to a low value gets deleted fast and the hook phase gets stuck. +// For more info, see issue https://github.com/argoproj/argo-cd/issues/6880 +func (sc *syncContext) processJobHookTask(task *syncTask) { + hookFinalizer := "argoproj.io/hook-finalizer" + + task.postprocess = func() error { + sc.log.V(1).Info("Processing hook task with a Job resource - attaching hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace()) + + job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + // Skip postprocessing if the Job does not have a ttlSecondsAfterFinished set. + if job.Spec.TTLSecondsAfterFinished == nil { + return nil + } + // Attach the hook finalizer to the Job resource so it does not get deleted before the sync phase is marked as completed. + job.Finalizers = append(job.Finalizers, hookFinalizer) + + _, err = sc.clientset. + BatchV1(). + Jobs(job.Namespace). + Update(context.TODO(), job, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil + } + + task.cleanup = func() error { + sc.log.V(1).Info("Cleaning up hook task with a Job resource - removing hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace()) + + job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + // Remove the hook finalizer from the Job resource. + var filtered []string + for _, s := range job.Finalizers { + if s != hookFinalizer { + filtered = append(filtered, s) + } + } + job.Finalizers = filtered + + _, err = sc.clientset. + BatchV1(). + Jobs(job.Namespace). + Update(context.TODO(), job, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil + } +} + func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks { isNamespaceCreationNeeded := true @@ -1007,6 +1104,11 @@ func (sc *syncContext) applyObject(t *syncTask, dryRun, validate bool) (common.R if err != nil { return common.ResultCodeSyncFailed, err.Error() } + if t.postprocess != nil && !dryRun { + if err := t.postprocess(); err != nil { + sc.log.Error(err, "failed to call postprocess function for task") + } + } if kube.IsCRD(t.targetObj) && !dryRun { crdName := t.targetObj.GetName() if err = sc.ensureCRDReady(crdName); err != nil { diff --git a/pkg/sync/sync_task.go b/pkg/sync/sync_task.go index 01c67a98b..289993ba7 100644 --- a/pkg/sync/sync_task.go +++ b/pkg/sync/sync_task.go @@ -24,6 +24,8 @@ type syncTask struct { operationState common.OperationPhase message string waveOverride *int + postprocess func() error + cleanup func() error } func ternary(val bool, a, b string) string {