Skip to content

Commit

Permalink
add postprocessing logic for sync tasks which handles job ttl
Browse files Browse the repository at this point in the history
Signed-off-by: Dejan Zele Pejchev <[email protected]>
  • Loading branch information
dejanzele committed Dec 11, 2024
1 parent 8849c3f commit 27d7edc
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pkg/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func IsWorse(current, new HealthStatusCode) bool {

// GetResourceHealth returns the health of a k8s resource
func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOverride) (health *HealthStatus, err error) {
if obj.GetDeletionTimestamp() != nil {
if obj.GetDeletionTimestamp() != nil && !hasHookFinalizer(obj) {
return &HealthStatus{
Status: HealthStatusProgressing,
Message: "Pending deletion",
Expand Down Expand Up @@ -97,6 +97,17 @@ func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOver

}

func hasHookFinalizer(obj *unstructured.Unstructured) bool {
hookFinalizer := "argoproj.io/hook-finalizer"
finalizers := obj.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == hookFinalizer {
return true
}
}
return false
}

// GetHealthCheckFunc returns built-in health check function or nil if health check is not supported
func GetHealthCheckFunc(gvk schema.GroupVersionKind) func(obj *unstructured.Unstructured) (*HealthStatus, error) {
switch gvk.Group {
Expand Down
102 changes: 102 additions & 0 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/kubernetes"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -228,6 +229,10 @@ func NewSyncContext(
if err != nil {
return nil, nil, err
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
}
ctx := &syncContext{
revision: revision,
resources: groupResources(reconciliationResult),
Expand All @@ -236,6 +241,7 @@ func NewSyncContext(
rawConfig: rawConfig,
dynamicIf: dynamicIf,
disco: disco,
clientset: clientset,
extensionsclientset: extensionsclientset,
kubectl: kubectl,
resourceOps: resourceOps,
Expand Down Expand Up @@ -331,6 +337,7 @@ type syncContext struct {
dynamicIf dynamic.Interface
disco discovery.DiscoveryInterface
extensionsclientset *clientset.Clientset
clientset *kubernetes.Clientset
kubectl kube.Kubectl
resourceOps kube.ResourceOperations
namespace string
Expand Down Expand Up @@ -476,6 +483,17 @@ func (sc *syncContext) Sync() {
return
}

hooksCompleted := tasks.Filter(func(task *syncTask) bool {
return task.isHook() && task.completed()
})
for _, task := range hooksCompleted {
if task.cleanup != nil {
if err := task.cleanup(); err != nil {
sc.log.V(1).Error(err, "failed to run hook task cleanup")
}
}
}

// collect all completed hooks which have appropriate delete policy
hooksPendingDeletionSuccessful := tasks.Filter(func(task *syncTask) bool {
return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseSuccessful()
Expand Down Expand Up @@ -688,6 +706,8 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {

sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks")

sc.processHookTasks(hookTasks)

tasks := resourceTasks
tasks = append(tasks, hookTasks...)

Expand Down Expand Up @@ -830,6 +850,83 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
return tasks, successful
}

// processHookTasks applies additional logic to hook tasks.
func (sc *syncContext) processHookTasks(tasks syncTasks) {
for _, task := range tasks {
// This is a safety check to ensure that we process only currently running hook tasks.
if !task.isHook() || !task.pending() {
continue
}
// Safety check to ensure that the target object is not nil.
if task.targetObj == nil {
continue
}
// Currently, we only process hook tasks where the target object is a Job.
if task.targetObj.GetKind() == "Job" {
sc.processJobHookTask(task)
}
}
}

// processJobHookTask processes a hook task where the target object is a Job and has defined ttlSecondsAfterFinished.
// This addresses the issue where a Job with a ttlSecondsAfterFinished set to a low value gets deleted fast and the hook phase gets stuck.
// For more info, see issue https://github.com/argoproj/argo-cd/issues/6880
func (sc *syncContext) processJobHookTask(task *syncTask) {
hookFinalizer := "argoproj.io/hook-finalizer"

task.postprocess = func() error {
sc.log.V(1).Info("Processing hook task with a Job resource - attaching hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace())

job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{})
if err != nil {
return err
}

// Skip postprocessing if the Job does not have a ttlSecondsAfterFinished set.
if job.Spec.TTLSecondsAfterFinished == nil {
return nil
}
// Attach the hook finalizer to the Job resource so it does not get deleted before the sync phase is marked as completed.
job.Finalizers = append(job.Finalizers, hookFinalizer)

_, err = sc.clientset.
BatchV1().
Jobs(job.Namespace).
Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

task.cleanup = func() error {
sc.log.V(1).Info("Cleaning up hook task with a Job resource - removing hook finalizer", "name", task.targetObj.GetName(), "namespace", task.targetObj.GetNamespace())

job, err := sc.clientset.BatchV1().Jobs(task.targetObj.GetNamespace()).Get(context.TODO(), task.targetObj.GetName(), metav1.GetOptions{})
if err != nil {
return err
}

// Remove the hook finalizer from the Job resource.
var filtered []string
for _, s := range job.Finalizers {
if s != hookFinalizer {
filtered = append(filtered, s)
}
}
job.Finalizers = filtered

_, err = sc.clientset.
BatchV1().
Jobs(job.Namespace).
Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
}

func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks {
isNamespaceCreationNeeded := true

Expand Down Expand Up @@ -1007,6 +1104,11 @@ func (sc *syncContext) applyObject(t *syncTask, dryRun, validate bool) (common.R
if err != nil {
return common.ResultCodeSyncFailed, err.Error()
}
if t.postprocess != nil && !dryRun {
if err := t.postprocess(); err != nil {
sc.log.Error(err, "failed to call postprocess function for task")
}
}
if kube.IsCRD(t.targetObj) && !dryRun {
crdName := t.targetObj.GetName()
if err = sc.ensureCRDReady(crdName); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sync/sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type syncTask struct {
operationState common.OperationPhase
message string
waveOverride *int
postprocess func() error
cleanup func() error
}

func ternary(val bool, a, b string) string {
Expand Down

0 comments on commit 27d7edc

Please sign in to comment.