From 9e78f1c71a2dfe1dae3ffdf03abc5b0777ff80f5 Mon Sep 17 00:00:00 2001 From: Peter Jiang Date: Wed, 9 Jul 2025 17:19:18 -0700 Subject: [PATCH 1/6] fix: invalidate cluster cache after sync operations Signed-off-by: Peter Jiang --- pkg/engine/engine.go | 7 +++++++ pkg/sync/sync_context.go | 17 +++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 01e5561b1..6cc67190c 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -84,6 +84,13 @@ func (e *gitOpsEngine) Sync(ctx context.Context, return nil, fmt.Errorf("failed to diff objects: %w", err) } opts = append(opts, sync.WithSkipHooks(!diffRes.Modified)) + + // Add cache invalidation callback to invalidate cache for modified resources after sync + opts = append(opts, sync.WithCacheInvalidationCallback(func() { + // Invalidate the entire cache to ensure consistency + e.cache.Invalidate() + })) + syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...) if err != nil { return nil, fmt.Errorf("failed to create sync context: %w", err) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 8f4d51e4f..26ce33d40 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -218,6 +218,14 @@ func WithClientSideApplyMigration(enabled bool, manager string) SyncOpt { } } +// WithCacheInvalidationCallback sets a callback that will be invoked after successful sync operations +// to invalidate the cache +func WithCacheInvalidationCallback(callback func()) SyncOpt { + return func(ctx *syncContext) { + ctx.cacheInvalidationCallback = callback + } +} + // NewSyncContext creates new instance of a SyncContext func NewSyncContext( revision string, @@ -389,6 +397,10 @@ type syncContext struct { applyOutOfSyncOnly bool // stores whether the resource is modified or not modificationResult map[kubeutil.ResourceKey]bool + + // cacheInvalidationCallback is a callback that will be invoked after successful sync operations + // to invalidate the cache + cacheInvalidationCallback func() } func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool) { @@ -598,6 +610,11 @@ func (sc *syncContext) Sync() { // delete all completed hooks which have appropriate delete policy sc.deleteHooks(hooksPendingDeletionSuccessful) sc.setOperationPhase(common.OperationSucceeded, "successfully synced (all tasks run)") + + // Invalidate cache after successful sync + if sc.cacheInvalidationCallback != nil { + sc.cacheInvalidationCallback() + } } else { sc.setRunningPhase(remainingTasks, false) } From 6cdd21650888211faf81271fa23d434150410b8f Mon Sep 17 00:00:00 2001 From: Peter Jiang Date: Wed, 9 Jul 2025 19:17:53 -0700 Subject: [PATCH 2/6] add ensureSynced to check if cache is stale Signed-off-by: Peter Jiang --- pkg/engine/engine.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 6cc67190c..0ff449258 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -74,6 +74,13 @@ func (e *gitOpsEngine) Sync(ctx context.Context, namespace string, opts ...sync.SyncOpt, ) ([]common.ResourceSyncResult, error) { + // Ensure cache is synced before getting managed live objects + // This forces a refresh if the cache was invalidated + err := e.cache.EnsureSynced() + if err != nil { + return nil, fmt.Errorf("failed to ensure cache is synced: %w", err) + } + managedResources, err := e.cache.GetManagedLiveObjs(resources, isManaged) if err != nil { return nil, fmt.Errorf("failed to get managed live objects: %w", err) From 606273ed96c7f961c074c351bc240bcb81aabe0d Mon Sep 17 00:00:00 2001 From: Peter Jiang Date: Thu, 10 Jul 2025 16:51:25 -0700 Subject: [PATCH 3/6] Invalidate only modified resources during sync Signed-off-by: Peter Jiang --- pkg/cache/cluster.go | 25 +++++ pkg/cache/cluster_test.go | 94 ++++++++++++++++ pkg/engine/engine.go | 8 +- pkg/sync/sync_context.go | 33 +++++- pkg/sync/sync_context_test.go | 194 ++++++++++++++++++++++++++++++++++ 5 files changed, 348 insertions(+), 6 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index bc86e8b98..958543c51 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -128,6 +128,8 @@ type ClusterCache interface { GetGVKParser() *managedfields.GvkParser // Invalidate cache and executes callback that optionally might update cache settings Invalidate(opts ...UpdateSettingsFunc) + // InvalidateResources invalidates specific resources in the cache + InvalidateResources(keys []kube.ResourceKey) // FindResources returns resources that matches given list of predicates from specified namespace or everywhere if specified namespace is empty FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource // IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree. @@ -494,6 +496,29 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { c.log.Info("Invalidated cluster") } +// InvalidateResources invalidates specific resources in the cache by removing them. +// This forces the resources to be refreshed from the cluster on the next access. +func (c *clusterCache) InvalidateResources(keys []kube.ResourceKey) { + if len(keys) == 0 { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + for _, key := range keys { + if _, exists := c.resources[key]; exists { + // Remove the resource from cache - this will force it to be refreshed on next access + c.onNodeRemoved(key) + c.log.Info("Invalidated resource from cache", "key", key.String()) + } else { + c.log.Info("Resource not found in cache for invalidation", "key", key.String()) + } + } + + c.log.Info("Invalidated specific resources from cache", "count", len(keys)) +} + // clusterCacheSync's lock should be held before calling this method func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) bool { syncTime := syncStatus.syncTime diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 8aa286fc4..36b5a1b7e 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -1393,3 +1393,97 @@ func BenchmarkIterateHierarchyV2(b *testing.B) { // }) // } //} + +func TestInvalidateResources(t *testing.T) { + // Create test resources + pod1 := testPod1() + pod2 := testPod2() + rs := testRS() + deploy := testDeploy() + + // Test invalidating specific resources + t.Run("InvalidateSpecificResources", func(t *testing.T) { + cluster := newCluster(t, pod1, pod2, rs, deploy) + err := cluster.EnsureSynced() + require.NoError(t, err) + + // Verify all resources are initially in cache + initialResources := cluster.resources + require.Len(t, initialResources, 4) + + keysToInvalidate := []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(pod1)), + kube.GetResourceKey(mustToUnstructured(pod2)), + } + + cluster.InvalidateResources(keysToInvalidate) + + // Verify the specific resources were removed from cache + for _, key := range keysToInvalidate { + _, exists := cluster.resources[key] + assert.False(t, exists, "Resource %s should have been invalidated", key.String()) + } + + // Verify other resources remain in cache + rsKey := kube.GetResourceKey(mustToUnstructured(rs)) + deployKey := kube.GetResourceKey(mustToUnstructured(deploy)) + + _, exists := cluster.resources[rsKey] + assert.True(t, exists, "Resource %s should still be in cache", rsKey.String()) + + _, exists = cluster.resources[deployKey] + assert.True(t, exists, "Resource %s should still be in cache", deployKey.String()) + }) + + t.Run("InvalidateEmptyList", func(t *testing.T) { + cluster := newCluster(t, pod1, pod2, rs, deploy) + err := cluster.EnsureSynced() + require.NoError(t, err) + + initialCount := len(cluster.resources) + cluster.InvalidateResources([]kube.ResourceKey{}) + + // Verify no resources were removed + assert.Equal(t, initialCount, len(cluster.resources), "No resources should have been invalidated") + }) + + t.Run("InvalidateNonExistentResources", func(t *testing.T) { + cluster := newCluster(t, pod1, pod2, rs, deploy) + err := cluster.EnsureSynced() + require.NoError(t, err) + + initialCount := len(cluster.resources) + nonExistentKey := kube.ResourceKey{ + Group: "apps", + Kind: "Deployment", + Namespace: "non-existent", + Name: "non-existent-deploy", + } + + cluster.InvalidateResources([]kube.ResourceKey{nonExistentKey}) + + // Verify no resources were removed + assert.Equal(t, initialCount, len(cluster.resources), "No resources should have been invalidated") + }) + + t.Run("InvalidateResourcesUpdatesNamespaceIndex", func(t *testing.T) { + cluster := newCluster(t, pod1, pod2, rs, deploy) + err := cluster.EnsureSynced() + require.NoError(t, err) + + pod1Key := kube.GetResourceKey(mustToUnstructured(pod1)) + + // Verify resource is in namespace index + nsResources := cluster.nsIndex[pod1Key.Namespace] + _, exists := nsResources[pod1Key] + assert.True(t, exists, "Resource should exist in namespace index") + + // Invalidate the resource + cluster.InvalidateResources([]kube.ResourceKey{pod1Key}) + + // Verify resource is removed from namespace index + nsResources = cluster.nsIndex[pod1Key.Namespace] + _, exists = nsResources[pod1Key] + assert.False(t, exists, "Resource should be removed from namespace index") + }) +} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 0ff449258..e0fd6b92c 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -93,9 +93,11 @@ func (e *gitOpsEngine) Sync(ctx context.Context, opts = append(opts, sync.WithSkipHooks(!diffRes.Modified)) // Add cache invalidation callback to invalidate cache for modified resources after sync - opts = append(opts, sync.WithCacheInvalidationCallback(func() { - // Invalidate the entire cache to ensure consistency - e.cache.Invalidate() + opts = append(opts, sync.WithCacheInvalidationCallback(func(modifiedResources []kube.ResourceKey) { + // Only invalidate the specific resources that were modified + if len(modifiedResources) > 0 { + e.cache.InvalidateResources(modifiedResources) + } })) syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 26ce33d40..cf66c47fb 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -220,7 +220,7 @@ func WithClientSideApplyMigration(enabled bool, manager string) SyncOpt { // WithCacheInvalidationCallback sets a callback that will be invoked after successful sync operations // to invalidate the cache -func WithCacheInvalidationCallback(callback func()) SyncOpt { +func WithCacheInvalidationCallback(callback func([]kubeutil.ResourceKey)) SyncOpt { return func(ctx *syncContext) { ctx.cacheInvalidationCallback = callback } @@ -400,7 +400,7 @@ type syncContext struct { // cacheInvalidationCallback is a callback that will be invoked after successful sync operations // to invalidate the cache - cacheInvalidationCallback func() + cacheInvalidationCallback func([]kubeutil.ResourceKey) } func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool) { @@ -569,6 +569,15 @@ func (sc *syncContext) Sync() { // delete all completed hooks which have appropriate delete policy sc.deleteHooks(hooksPendingDeletionSuccessful) sc.setOperationPhase(common.OperationSucceeded, "successfully synced (no more tasks)") + + // Invalidate cache after successful sync + if sc.cacheInvalidationCallback != nil { + modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes)) + for _, result := range sc.syncRes { + modifiedResources = append(modifiedResources, result.ResourceKey) + } + sc.cacheInvalidationCallback(modifiedResources) + } return } @@ -605,6 +614,20 @@ func (sc *syncContext) Sync() { syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed }) sc.deleteHooks(hooksPendingDeletionFailed) sc.setOperationFailed(syncFailTasks, syncFailedTasks, "one or more objects failed to apply") + + // Invalidate cache for successfully synced resources even if overall operation failed + if sc.cacheInvalidationCallback != nil { + var modifiedResources []kubeutil.ResourceKey + for _, result := range sc.syncRes { + // Only invalidate resources that were successfully synced + if result.Status == common.ResultCodeSynced { + modifiedResources = append(modifiedResources, result.ResourceKey) + } + } + if len(modifiedResources) > 0 { + sc.cacheInvalidationCallback(modifiedResources) + } + } case successful: if remainingTasks.Len() == 0 { // delete all completed hooks which have appropriate delete policy @@ -613,7 +636,11 @@ func (sc *syncContext) Sync() { // Invalidate cache after successful sync if sc.cacheInvalidationCallback != nil { - sc.cacheInvalidationCallback() + modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes)) + for _, result := range sc.syncRes { + modifiedResources = append(modifiedResources, result.ResourceKey) + } + sc.cacheInvalidationCallback(modifiedResources) } } else { sc.setRunningPhase(remainingTasks, false) diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 0e8d01ebb..80dda11e3 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -36,6 +36,7 @@ import ( "github.com/argoproj/gitops-engine/pkg/utils/kube" "github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest" testingutils "github.com/argoproj/gitops-engine/pkg/utils/testing" + cmdutil "k8s.io/kubectl/pkg/cmd/util" ) var standardVerbs = metav1.Verbs{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"} @@ -2262,3 +2263,196 @@ func TestNeedsClientSideApplyMigration(t *testing.T) { }) } } + +func TestSyncContext_CacheInvalidationCallback(t *testing.T) { + // Track which resources are passed to the callback + var callbackInvoked bool + var invalidatedResources []kube.ResourceKey + + callback := func(resources []kube.ResourceKey) { + callbackInvoked = true + invalidatedResources = append(invalidatedResources, resources...) + } + + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(callback), + ) + + // Create test resources + pod := testingutils.NewPod() + pod.SetName("test-pod") + pod.SetNamespace(testingutils.FakeArgoCDNamespace) + service := testingutils.NewService() + service.SetName("test-service") + service.SetNamespace(testingutils.FakeArgoCDNamespace) + + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{nil, nil}, + Target: []*unstructured.Unstructured{pod, service}, + }) + + // Run sync + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed successfully + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Len(t, resources, 2) + + // Verify callback was invoked with the modified resources + assert.True(t, callbackInvoked, "Cache invalidation callback should have been invoked") + assert.Len(t, invalidatedResources, 2, "Should have invalidated 2 resources") + + // Verify the correct resources were passed to the callback + expectedKeys := []kube.ResourceKey{ + kube.GetResourceKey(pod), + kube.GetResourceKey(service), + } + + for _, expectedKey := range expectedKeys { + assert.Contains(t, invalidatedResources, expectedKey, "Expected resource key should be in invalidated resources") + } +} + +func TestSyncContext_CacheInvalidationCallback_NoResources(t *testing.T) { + // Track whether callback is invoked + var callbackInvoked bool + var invalidatedResources []kube.ResourceKey + + callback := func(resources []kube.ResourceKey) { + callbackInvoked = true + invalidatedResources = append(invalidatedResources, resources...) + } + + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(callback), + ) + + // No resources to sync + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{}, + Target: []*unstructured.Unstructured{}, + }) + + // Run sync + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed successfully + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Len(t, resources, 0) + + // Verify callback was invoked with empty resource list + assert.True(t, callbackInvoked, "Cache invalidation callback should have been invoked") + assert.Len(t, invalidatedResources, 0, "Should have invalidated 0 resources") +} + +func TestSyncContext_CacheInvalidationCallback_PartialFailure(t *testing.T) { + // Track which resources are passed to the callback + var callbackInvoked bool + var invalidatedResources []kube.ResourceKey + + callback := func(resources []kube.ResourceKey) { + callbackInvoked = true + invalidatedResources = append(invalidatedResources, resources...) + } + + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(callback), + ) + + // Create test resources - one will succeed, one will fail + pod := testingutils.NewPod() + pod.SetName("test-pod") + pod.SetNamespace(testingutils.FakeArgoCDNamespace) + service := testingutils.NewService() + service.SetName("test-service") + service.SetNamespace(testingutils.FakeArgoCDNamespace) + + // Create a custom mock that can distinguish between dry-run and wet-run + customMockResourceOps := &customMockResourceOps{ + MockResourceOps: &kubetest.MockResourceOps{ + Commands: map[string]kubetest.KubectlOutput{ + // No commands in the default map - will succeed + }, + }, + } + + syncCtx.resourceOps = customMockResourceOps + + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{nil, nil}, + Target: []*unstructured.Unstructured{pod, service}, + }) + + // Run sync + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed with failure + assert.Equal(t, synccommon.OperationFailed, phase) + assert.Len(t, resources, 2) + + // Verify callback was invoked (should be called even on partial failure) + assert.True(t, callbackInvoked, "Cache invalidation callback should have been invoked") + + // Should only invalidate the successfully synced resource + assert.Len(t, invalidatedResources, 1, "Should have invalidated 1 resource") + + // Verify the successful resource was invalidated + serviceKey := kube.GetResourceKey(service) + assert.Contains(t, invalidatedResources, serviceKey, "Should have invalidated the successful resource") +} + +// customMockResourceOps is a custom mock that can distinguish between dry-run and wet-run +type customMockResourceOps struct { + *kubetest.MockResourceOps +} + +func (c *customMockResourceOps) ApplyResource(ctx context.Context, obj *unstructured.Unstructured, dryRunStrategy cmdutil.DryRunStrategy, force, validate, serverSideApply bool, manager string) (string, error) { + // Let dry-run succeed for both resources + if dryRunStrategy == cmdutil.DryRunClient { + return "dry-run successful", nil + } + + // During wet-run, fail the pod but succeed the service + if obj.GetKind() == "Pod" && obj.GetName() == "test-pod" { + return "", errors.New("pod sync failed") + } + + // For service, succeed + if obj.GetKind() == "Service" && obj.GetName() == "test-service" { + return "service sync successful", nil + } + + // Default behavior for other resources + return c.MockResourceOps.ApplyResource(ctx, obj, dryRunStrategy, force, validate, serverSideApply, manager) +} + +func TestSyncContext_CacheInvalidationCallback_NilCallback(t *testing.T) { + // Test with nil callback (should not panic) + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(nil), + ) + + pod := testingutils.NewPod() + pod.SetName("test-pod") + pod.SetNamespace(testingutils.FakeArgoCDNamespace) + + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{nil}, + Target: []*unstructured.Unstructured{pod}, + }) + + // Run sync - should not panic + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed successfully + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Len(t, resources, 1) +} From 9be44f983784c7eeba7d8a544223bd99ad2c22fc Mon Sep 17 00:00:00 2001 From: Peter Jiang Date: Thu, 10 Jul 2025 17:05:11 -0700 Subject: [PATCH 4/6] fix linting Signed-off-by: Peter Jiang --- pkg/cache/cluster_test.go | 4 +-- pkg/engine/engine.go | 2 +- pkg/sync/sync_context_test.go | 56 ++++++++++++++++++----------------- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 36b5a1b7e..ccd2c1068 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -1444,7 +1444,7 @@ func TestInvalidateResources(t *testing.T) { cluster.InvalidateResources([]kube.ResourceKey{}) // Verify no resources were removed - assert.Equal(t, initialCount, len(cluster.resources), "No resources should have been invalidated") + assert.Len(t, cluster.resources, initialCount, "No resources should have been invalidated") }) t.Run("InvalidateNonExistentResources", func(t *testing.T) { @@ -1463,7 +1463,7 @@ func TestInvalidateResources(t *testing.T) { cluster.InvalidateResources([]kube.ResourceKey{nonExistentKey}) // Verify no resources were removed - assert.Equal(t, initialCount, len(cluster.resources), "No resources should have been invalidated") + assert.Len(t, cluster.resources, initialCount, "No resources should have been invalidated") }) t.Run("InvalidateResourcesUpdatesNamespaceIndex", func(t *testing.T) { diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index e0fd6b92c..d3e5f0575 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -78,7 +78,7 @@ func (e *gitOpsEngine) Sync(ctx context.Context, // This forces a refresh if the cache was invalidated err := e.cache.EnsureSynced() if err != nil { - return nil, fmt.Errorf("failed to ensure cache is synced: %w", err) + return nil, fmt.Errorf("error during sync: failed to ensure cache is synced: %w", err) } managedResources, err := e.cache.GetManagedLiveObjs(resources, isManaged) diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 80dda11e3..9a5a329ce 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -3,7 +3,6 @@ package sync import ( "context" "encoding/json" - "errors" "fmt" "net/http" "net/http/httptest" @@ -12,22 +11,22 @@ import ( "testing" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime/schema" - "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" fakedisco "k8s.io/client-go/discovery/fake" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/rest" testcore "k8s.io/client-go/testing" "k8s.io/klog/v2/textlogger" + cmdutil "k8s.io/kubectl/pkg/cmd/util" "github.com/argoproj/gitops-engine/pkg/diff" "github.com/argoproj/gitops-engine/pkg/health" @@ -36,7 +35,6 @@ import ( "github.com/argoproj/gitops-engine/pkg/utils/kube" "github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest" testingutils "github.com/argoproj/gitops-engine/pkg/utils/testing" - cmdutil "k8s.io/kubectl/pkg/cmd/util" ) var standardVerbs = metav1.Verbs{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"} @@ -108,7 +106,7 @@ func TestSyncValidate(t *testing.T) { func TestSyncNotPermittedNamespace(t *testing.T) { syncCtx := newTestSyncCtx(nil, WithPermissionValidator(func(_ *unstructured.Unstructured, _ *metav1.APIResource) error { - return errors.New("not permitted in project") + return apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) })) targetPod := testingutils.NewPod() targetPod.SetNamespace("kube-system") @@ -149,7 +147,7 @@ func TestSyncNamespaceCreatedBeforeDryRunWithFailure(t *testing.T) { resourceOps.Commands = map[string]kubetest.KubectlOutput{} resourceOps.Commands[pod.GetName()] = kubetest.KubectlOutput{ Output: "should not be returned", - Err: errors.New("invalid object failing dry-run"), + Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), } }) syncCtx.resources = groupResources(ReconciliationResult{ @@ -363,7 +361,7 @@ func TestSyncCreateFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ testSvc.GetName(): { Output: "", - Err: errors.New("foo"), + Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), }, }, } @@ -372,7 +370,7 @@ func TestSyncCreateFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ testSvc.GetName(): { Output: "", - Err: errors.New("foo"), + Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), }, }, } @@ -388,7 +386,7 @@ func TestSyncCreateFailure(t *testing.T) { assert.Len(t, resources, 1) result := resources[0] assert.Equal(t, synccommon.ResultCodeSyncFailed, result.Status) - assert.Equal(t, "foo", result.Message) + assert.Equal(t, "invalid object failing dry-run", result.Message) } func TestSync_ApplyOutOfSyncOnly(t *testing.T) { @@ -513,7 +511,7 @@ func TestSyncPruneFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ "test-service": { Output: "", - Err: errors.New("foo"), + Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), }, }, } @@ -522,7 +520,7 @@ func TestSyncPruneFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ "test-service": { Output: "", - Err: errors.New("foo"), + Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), }, }, } @@ -542,7 +540,7 @@ func TestSyncPruneFailure(t *testing.T) { assert.Len(t, resources, 1) result := resources[0] assert.Equal(t, synccommon.ResultCodeSyncFailed, result.Status) - assert.Equal(t, "foo", result.Message) + assert.Equal(t, "invalid object failing dry-run", result.Message) } type APIServerMock struct { @@ -1214,7 +1212,7 @@ func TestNamespaceAutoCreationForNonExistingNs(t *testing.T) { creatorCalled := false syncCtx.syncNamespace = func(_, _ *unstructured.Unstructured) (bool, error) { creatorCalled = true - return false, errors.New("some error") + return false, apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) } tasks, successful := syncCtx.getSyncTasks() @@ -1228,7 +1226,7 @@ func TestNamespaceAutoCreationForNonExistingNs(t *testing.T) { skipDryRun: false, syncStatus: synccommon.ResultCodeSyncFailed, operationState: synccommon.OperationError, - message: "namespaceModifier error: some error", + message: "namespaceModifier error: invalid object failing dry-run", waveOverride: nil, }, tasks[0]) }) @@ -1269,11 +1267,11 @@ func TestSyncFailureHookWithFailedSync(t *testing.T) { }) syncCtx.hooks = []*unstructured.Unstructured{newHook(synccommon.HookTypeSyncFail)} mockKubectl := &kubetest.MockKubectlCmd{ - Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: errors.New("")}}, + Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}}, } syncCtx.kubectl = mockKubectl mockResourceOps := kubetest.MockResourceOps{ - Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: errors.New("")}}, + Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}}, } syncCtx.resourceOps = &mockResourceOps @@ -1322,18 +1320,18 @@ func TestRunSyncFailHooksFailed(t *testing.T) { mockKubectl := &kubetest.MockKubectlCmd{ Commands: map[string]kubetest.KubectlOutput{ // Fail operation - pod.GetName(): {Err: errors.New("")}, + pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, // Fail a single SyncFail hook - failedSyncFailHook.GetName(): {Err: errors.New("")}, + failedSyncFailHook.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, }, } syncCtx.kubectl = mockKubectl mockResourceOps := kubetest.MockResourceOps{ Commands: map[string]kubetest.KubectlOutput{ // Fail operation - pod.GetName(): {Err: errors.New("")}, + pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, // Fail a single SyncFail hook - failedSyncFailHook.GetName(): {Err: errors.New("")}, + failedSyncFailHook.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, }, } syncCtx.resourceOps = &mockResourceOps @@ -1698,13 +1696,13 @@ func TestSyncWaveHookFail(t *testing.T) { called := false syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { called = true - return errors.New("intentional error") + return apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) } syncCtx.Sync() assert.True(t, called) phase, msg, results := syncCtx.GetState() assert.Equal(t, synccommon.OperationFailed, phase) - assert.Equal(t, "SyncWaveHook failed: intentional error", msg) + assert.Equal(t, "SyncWaveHook failed: invalid object failing dry-run", msg) assert.Equal(t, synccommon.OperationRunning, results[0].HookPhase) } @@ -2342,11 +2340,11 @@ func TestSyncContext_CacheInvalidationCallback_NoResources(t *testing.T) { // Verify sync completed successfully assert.Equal(t, synccommon.OperationSucceeded, phase) - assert.Len(t, resources, 0) + assert.Empty(t, resources) // Verify callback was invoked with empty resource list assert.True(t, callbackInvoked, "Cache invalidation callback should have been invoked") - assert.Len(t, invalidatedResources, 0, "Should have invalidated 0 resources") + assert.Empty(t, invalidatedResources, "Should have invalidated 0 resources") } func TestSyncContext_CacheInvalidationCallback_PartialFailure(t *testing.T) { @@ -2420,7 +2418,7 @@ func (c *customMockResourceOps) ApplyResource(ctx context.Context, obj *unstruct // During wet-run, fail the pod but succeed the service if obj.GetKind() == "Pod" && obj.GetName() == "test-pod" { - return "", errors.New("pod sync failed") + return "", apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) } // For service, succeed @@ -2429,7 +2427,11 @@ func (c *customMockResourceOps) ApplyResource(ctx context.Context, obj *unstruct } // Default behavior for other resources - return c.MockResourceOps.ApplyResource(ctx, obj, dryRunStrategy, force, validate, serverSideApply, manager) + _, err := c.MockResourceOps.ApplyResource(ctx, obj, dryRunStrategy, force, validate, serverSideApply, manager) + if err != nil { + return "", fmt.Errorf("apply resource failed: %w", err) + } + return "default success", nil } func TestSyncContext_CacheInvalidationCallback_NilCallback(t *testing.T) { From 5c61ab6fef4bd1af2c7951cf98d708f30fc42edd Mon Sep 17 00:00:00 2001 From: Peter Jiang Date: Thu, 10 Jul 2025 17:14:28 -0700 Subject: [PATCH 5/6] fix linting Signed-off-by: Peter Jiang --- pkg/sync/sync_context_test.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 9a5a329ce..a040be9c8 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -3,6 +3,7 @@ package sync import ( "context" "encoding/json" + "errors" "fmt" "net/http" "net/http/httptest" @@ -106,7 +107,7 @@ func TestSyncValidate(t *testing.T) { func TestSyncNotPermittedNamespace(t *testing.T) { syncCtx := newTestSyncCtx(nil, WithPermissionValidator(func(_ *unstructured.Unstructured, _ *metav1.APIResource) error { - return apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) + return errors.New("not permitted in project") })) targetPod := testingutils.NewPod() targetPod.SetNamespace("kube-system") @@ -147,7 +148,7 @@ func TestSyncNamespaceCreatedBeforeDryRunWithFailure(t *testing.T) { resourceOps.Commands = map[string]kubetest.KubectlOutput{} resourceOps.Commands[pod.GetName()] = kubetest.KubectlOutput{ Output: "should not be returned", - Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), + Err: errors.New("invalid object failing dry-run"), } }) syncCtx.resources = groupResources(ReconciliationResult{ @@ -511,7 +512,7 @@ func TestSyncPruneFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ "test-service": { Output: "", - Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), + Err: errors.New("foo"), }, }, } @@ -520,7 +521,7 @@ func TestSyncPruneFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ "test-service": { Output: "", - Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), + Err: errors.New("foo"), }, }, } @@ -540,7 +541,7 @@ func TestSyncPruneFailure(t *testing.T) { assert.Len(t, resources, 1) result := resources[0] assert.Equal(t, synccommon.ResultCodeSyncFailed, result.Status) - assert.Equal(t, "invalid object failing dry-run", result.Message) + assert.Equal(t, "foo", result.Message) } type APIServerMock struct { @@ -1212,7 +1213,7 @@ func TestNamespaceAutoCreationForNonExistingNs(t *testing.T) { creatorCalled := false syncCtx.syncNamespace = func(_, _ *unstructured.Unstructured) (bool, error) { creatorCalled = true - return false, apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) + return false, errors.New("some error") } tasks, successful := syncCtx.getSyncTasks() @@ -1226,7 +1227,7 @@ func TestNamespaceAutoCreationForNonExistingNs(t *testing.T) { skipDryRun: false, syncStatus: synccommon.ResultCodeSyncFailed, operationState: synccommon.OperationError, - message: "namespaceModifier error: invalid object failing dry-run", + message: "namespaceModifier error: some error", waveOverride: nil, }, tasks[0]) }) @@ -1696,13 +1697,13 @@ func TestSyncWaveHookFail(t *testing.T) { called := false syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { called = true - return apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) + return errors.New("intentional error") } syncCtx.Sync() assert.True(t, called) phase, msg, results := syncCtx.GetState() assert.Equal(t, synccommon.OperationFailed, phase) - assert.Equal(t, "SyncWaveHook failed: invalid object failing dry-run", msg) + assert.Equal(t, "SyncWaveHook failed: intentional error", msg) assert.Equal(t, synccommon.OperationRunning, results[0].HookPhase) } From f303abef3cf5310e73651145517f1e9d26be29e1 Mon Sep 17 00:00:00 2001 From: Peter Jiang Date: Thu, 10 Jul 2025 17:25:26 -0700 Subject: [PATCH 6/6] fix test Signed-off-by: Peter Jiang --- pkg/sync/sync_context_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index a040be9c8..8a0ac7e5e 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -362,7 +362,7 @@ func TestSyncCreateFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ testSvc.GetName(): { Output: "", - Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), + Err: errors.New("invalid object failing dry-run"), }, }, } @@ -371,7 +371,7 @@ func TestSyncCreateFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ testSvc.GetName(): { Output: "", - Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace), + Err: errors.New("invalid object failing dry-run"), }, }, }