diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index abd4379f7..55489e15d 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -30,7 +30,7 @@ import ( "github.com/argoproj/gitops-engine/pkg/diff" "github.com/argoproj/gitops-engine/pkg/health" "github.com/argoproj/gitops-engine/pkg/sync/common" - "github.com/argoproj/gitops-engine/pkg/sync/hook" + hookutil "github.com/argoproj/gitops-engine/pkg/sync/hook" resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource" "github.com/argoproj/gitops-engine/pkg/utils/kube" kubeutil "github.com/argoproj/gitops-engine/pkg/utils/kube" @@ -290,6 +290,23 @@ const ( // getOperationPhase returns a hook status from an _live_ unstructured object func (sc *syncContext) getOperationPhase(hook *unstructured.Unstructured) (common.OperationPhase, string, error) { + // start by detecting resources that: + // 1. have BeforeHookCreation deletion policies + // 2. were already deleted from the cluster + // 3. DELETE watch event from kubernetes control plane was not processed yet, + // this can happen under high load of controller and/or k8s control plane + // This results in old version still being present in cache and prematurely ending the sync wave, + // it is fixed by verifying creationTimestamp against Sync's start date + // fixes https://github.com/argoproj/gitops-engine/issues/446 + // related to artificial sync wave delays in ArgoCD: + // https://github.com/argoproj/argo-cd/blob/9fac0f6ae6e52d6f4978a1eaaf51fbffb9c0958a/controller/sync.go#L465-L485 + for _, policy := range hookutil.DeletePolicies(hook) { + if policy == common.HookDeletePolicyBeforeHookCreation && sc.startedAt.After(hook.GetCreationTimestamp().Time) { + key := kube.GetResourceKey(hook) + return common.OperationRunning, fmt.Sprintf("%s is recreating", key.String()), nil + } + } + phase := common.OperationSucceeded message := fmt.Sprintf("%s created", hook.GetName()) @@ -627,7 +644,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { obj := obj(resource.Target, resource.Live) // this creates garbage tasks - if hook.IsHook(obj) { + if hookutil.IsHook(obj) { sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook") continue } diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 6d3832279..f3aef7599 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,7 +26,6 @@ import ( "github.com/argoproj/gitops-engine/pkg/diff" "github.com/argoproj/gitops-engine/pkg/health" - "github.com/argoproj/gitops-engine/pkg/sync/common" synccommon "github.com/argoproj/gitops-engine/pkg/sync/common" "github.com/argoproj/gitops-engine/pkg/utils/kube" "github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest" @@ -568,7 +568,7 @@ func TestServerResourcesRetry(t *testing.T) { apiFailureCount: 0, expectedAPICalls: 1, expectedResources: 1, - expectedPhase: common.OperationSucceeded, + expectedPhase: synccommon.OperationSucceeded, expectedMessage: "success", }, { @@ -576,7 +576,7 @@ func TestServerResourcesRetry(t *testing.T) { apiFailureCount: 1, expectedAPICalls: 2, expectedResources: 1, - expectedPhase: common.OperationSucceeded, + expectedPhase: synccommon.OperationSucceeded, expectedMessage: "success", }, { @@ -584,7 +584,7 @@ func TestServerResourcesRetry(t *testing.T) { apiFailureCount: 2, expectedAPICalls: 3, expectedResources: 1, - expectedPhase: common.OperationSucceeded, + expectedPhase: synccommon.OperationSucceeded, expectedMessage: "success", }, { @@ -592,7 +592,7 @@ func TestServerResourcesRetry(t *testing.T) { apiFailureCount: 3, expectedAPICalls: 4, expectedResources: 1, - expectedPhase: common.OperationSucceeded, + expectedPhase: synccommon.OperationSucceeded, expectedMessage: "success", }, { @@ -600,7 +600,7 @@ func TestServerResourcesRetry(t *testing.T) { apiFailureCount: 4, expectedAPICalls: 5, expectedResources: 1, - expectedPhase: common.OperationSucceeded, + expectedPhase: synccommon.OperationSucceeded, expectedMessage: "success", }, { @@ -608,7 +608,7 @@ func TestServerResourcesRetry(t *testing.T) { apiFailureCount: 5, expectedAPICalls: 5, expectedResources: 1, - expectedPhase: common.OperationFailed, + expectedPhase: synccommon.OperationFailed, expectedMessage: "not valid", }, { @@ -617,7 +617,7 @@ func TestServerResourcesRetry(t *testing.T) { apiFailureCount: 1, expectedAPICalls: 1, expectedResources: 1, - expectedPhase: common.OperationFailed, + expectedPhase: synccommon.OperationFailed, expectedMessage: "not valid", }, } @@ -1029,8 +1029,17 @@ func TestSyncFailureHookWithFailedSync(t *testing.T) { func TestBeforeHookCreation(t *testing.T) { syncCtx := newTestSyncCtx() - hook := Annotate(Annotate(NewPod(), synccommon.AnnotationKeyHook, "Sync"), synccommon.AnnotationKeyHookDeletePolicy, "BeforeHookCreation") + + syncCtx.startedAt = time.Date(2022, 9, 14, 0, 0, 0, 0, time.UTC) + previousCreatedAt := syncCtx.startedAt.Add(-time.Hour) + newCreatedAt := syncCtx.startedAt.Add(time.Second) + + hook := NewPod() hook.SetNamespace(FakeArgoCDNamespace) + hook = Annotate(hook, synccommon.AnnotationKeyHook, string(synccommon.HookTypePreSync)) + hook = Annotate(hook, synccommon.AnnotationKeyHookDeletePolicy, string(synccommon.HookDeletePolicyBeforeHookCreation)) + hook.SetCreationTimestamp(metav1.NewTime(previousCreatedAt)) + syncCtx.resources = groupResources(ReconciliationResult{ Live: []*unstructured.Unstructured{hook}, Target: []*unstructured.Unstructured{nil}, @@ -1038,12 +1047,42 @@ func TestBeforeHookCreation(t *testing.T) { syncCtx.hooks = []*unstructured.Unstructured{hook} syncCtx.dynamicIf = fake.NewSimpleDynamicClient(runtime.NewScheme()) + // Should delete and recreate Pod, but not set status on hook syncCtx.Sync() - - _, _, resources := syncCtx.GetState() + phase, message, resources := syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) assert.Len(t, resources, 1) assert.Empty(t, resources[0].Message) - assert.Equal(t, "waiting for completion of hook /Pod/my-pod", syncCtx.message) + assert.Equal(t, "waiting for completion of hook /Pod/my-pod", message) + + // Should mark hook as running, because fresh object was not registered yet + syncCtx.Sync() + phase, message, resources = syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) + assert.Len(t, resources, 1) + assert.Equal(t, "/Pod/fake-argocd-ns/my-pod is recreating", resources[0].Message) + assert.Equal(t, "waiting for completion of hook /Pod/my-pod", message) + + // fresh hook was registered in Pending state, so should still be running + hook.SetCreationTimestamp(metav1.NewTime(newCreatedAt)) + assert.Nil(t, unstructured.SetNestedField(hook.Object, string(corev1.PodPending), "status", "phase")) + syncCtx.Sync() + phase, message, resources = syncCtx.GetState() + assert.Equal(t, synccommon.OperationRunning, phase) + assert.Len(t, resources, 1) + assert.Equal(t, "/Pod/fake-argocd-ns/my-pod is recreating", resources[0].Message) + assert.Equal(t, "waiting for completion of hook /Pod/my-pod", message) + + // hook finished so should succeed + statusMessage := "finished" + assert.Nil(t, unstructured.SetNestedField(hook.Object, string(corev1.PodSucceeded), "status", "phase")) + assert.Nil(t, unstructured.SetNestedField(hook.Object, statusMessage, "status", "message")) + syncCtx.Sync() + phase, message, resources = syncCtx.GetState() + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Len(t, resources, 1) + assert.Equal(t, statusMessage, resources[0].Message) + assert.Equal(t, "successfully synced (no more tasks)", message) } func TestRunSyncFailHooksFailed(t *testing.T) {