Skip to content

fix: invalidate cluster cache after sync operations #745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type ClusterCache interface {
GetGVKParser() *managedfields.GvkParser
// Invalidate cache and executes callback that optionally might update cache settings
Invalidate(opts ...UpdateSettingsFunc)
// InvalidateResources invalidates specific resources in the cache
InvalidateResources(keys []kube.ResourceKey)
// FindResources returns resources that matches given list of predicates from specified namespace or everywhere if specified namespace is empty
FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource
// IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree.
Expand Down Expand Up @@ -494,6 +496,29 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
c.log.Info("Invalidated cluster")
}

// InvalidateResources invalidates specific resources in the cache by removing them.
// This forces the resources to be refreshed from the cluster on the next access.
func (c *clusterCache) InvalidateResources(keys []kube.ResourceKey) {
if len(keys) == 0 {
return
}

c.lock.Lock()
defer c.lock.Unlock()

for _, key := range keys {
if _, exists := c.resources[key]; exists {
// Remove the resource from cache - this will force it to be refreshed on next access
c.onNodeRemoved(key)
c.log.Info("Invalidated resource from cache", "key", key.String())
} else {
c.log.Info("Resource not found in cache for invalidation", "key", key.String())
}
}

c.log.Info("Invalidated specific resources from cache", "count", len(keys))
}

// clusterCacheSync's lock should be held before calling this method
func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) bool {
syncTime := syncStatus.syncTime
Expand Down
94 changes: 94 additions & 0 deletions pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,3 +1393,97 @@ func BenchmarkIterateHierarchyV2(b *testing.B) {
// })
// }
//}

func TestInvalidateResources(t *testing.T) {
// Create test resources
pod1 := testPod1()
pod2 := testPod2()
rs := testRS()
deploy := testDeploy()

// Test invalidating specific resources
t.Run("InvalidateSpecificResources", func(t *testing.T) {
cluster := newCluster(t, pod1, pod2, rs, deploy)
err := cluster.EnsureSynced()
require.NoError(t, err)

// Verify all resources are initially in cache
initialResources := cluster.resources
require.Len(t, initialResources, 4)

keysToInvalidate := []kube.ResourceKey{
kube.GetResourceKey(mustToUnstructured(pod1)),
kube.GetResourceKey(mustToUnstructured(pod2)),
}

cluster.InvalidateResources(keysToInvalidate)

// Verify the specific resources were removed from cache
for _, key := range keysToInvalidate {
_, exists := cluster.resources[key]
assert.False(t, exists, "Resource %s should have been invalidated", key.String())
}

// Verify other resources remain in cache
rsKey := kube.GetResourceKey(mustToUnstructured(rs))
deployKey := kube.GetResourceKey(mustToUnstructured(deploy))

_, exists := cluster.resources[rsKey]
assert.True(t, exists, "Resource %s should still be in cache", rsKey.String())

_, exists = cluster.resources[deployKey]
assert.True(t, exists, "Resource %s should still be in cache", deployKey.String())
})

t.Run("InvalidateEmptyList", func(t *testing.T) {
cluster := newCluster(t, pod1, pod2, rs, deploy)
err := cluster.EnsureSynced()
require.NoError(t, err)

initialCount := len(cluster.resources)
cluster.InvalidateResources([]kube.ResourceKey{})

// Verify no resources were removed
assert.Len(t, cluster.resources, initialCount, "No resources should have been invalidated")
})

t.Run("InvalidateNonExistentResources", func(t *testing.T) {
cluster := newCluster(t, pod1, pod2, rs, deploy)
err := cluster.EnsureSynced()
require.NoError(t, err)

initialCount := len(cluster.resources)
nonExistentKey := kube.ResourceKey{
Group: "apps",
Kind: "Deployment",
Namespace: "non-existent",
Name: "non-existent-deploy",
}

cluster.InvalidateResources([]kube.ResourceKey{nonExistentKey})

// Verify no resources were removed
assert.Len(t, cluster.resources, initialCount, "No resources should have been invalidated")
})

t.Run("InvalidateResourcesUpdatesNamespaceIndex", func(t *testing.T) {
cluster := newCluster(t, pod1, pod2, rs, deploy)
err := cluster.EnsureSynced()
require.NoError(t, err)

pod1Key := kube.GetResourceKey(mustToUnstructured(pod1))

// Verify resource is in namespace index
nsResources := cluster.nsIndex[pod1Key.Namespace]
_, exists := nsResources[pod1Key]
assert.True(t, exists, "Resource should exist in namespace index")

// Invalidate the resource
cluster.InvalidateResources([]kube.ResourceKey{pod1Key})

// Verify resource is removed from namespace index
nsResources = cluster.nsIndex[pod1Key.Namespace]
_, exists = nsResources[pod1Key]
assert.False(t, exists, "Resource should be removed from namespace index")
})
}
16 changes: 16 additions & 0 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ func (e *gitOpsEngine) Sync(ctx context.Context,
namespace string,
opts ...sync.SyncOpt,
) ([]common.ResourceSyncResult, error) {
// Ensure cache is synced before getting managed live objects
// This forces a refresh if the cache was invalidated
err := e.cache.EnsureSynced()
if err != nil {
return nil, fmt.Errorf("error during sync: failed to ensure cache is synced: %w", err)
}

managedResources, err := e.cache.GetManagedLiveObjs(resources, isManaged)
if err != nil {
return nil, fmt.Errorf("failed to get managed live objects: %w", err)
Expand All @@ -84,6 +91,15 @@ func (e *gitOpsEngine) Sync(ctx context.Context,
return nil, fmt.Errorf("failed to diff objects: %w", err)
}
opts = append(opts, sync.WithSkipHooks(!diffRes.Modified))

// Add cache invalidation callback to invalidate cache for modified resources after sync
opts = append(opts, sync.WithCacheInvalidationCallback(func(modifiedResources []kube.ResourceKey) {
// Only invalidate the specific resources that were modified
if len(modifiedResources) > 0 {
e.cache.InvalidateResources(modifiedResources)
}
}))

syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to create sync context: %w", err)
Expand Down
44 changes: 44 additions & 0 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ func WithClientSideApplyMigration(enabled bool, manager string) SyncOpt {
}
}

// WithCacheInvalidationCallback sets a callback that will be invoked after successful sync operations
// to invalidate the cache
func WithCacheInvalidationCallback(callback func([]kubeutil.ResourceKey)) SyncOpt {
return func(ctx *syncContext) {
ctx.cacheInvalidationCallback = callback
}
}

// NewSyncContext creates new instance of a SyncContext
func NewSyncContext(
revision string,
Expand Down Expand Up @@ -389,6 +397,10 @@ type syncContext struct {
applyOutOfSyncOnly bool
// stores whether the resource is modified or not
modificationResult map[kubeutil.ResourceKey]bool

// cacheInvalidationCallback is a callback that will be invoked after successful sync operations
// to invalidate the cache
cacheInvalidationCallback func([]kubeutil.ResourceKey)
}

func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool) {
Expand Down Expand Up @@ -557,6 +569,15 @@ func (sc *syncContext) Sync() {
// delete all completed hooks which have appropriate delete policy
sc.deleteHooks(hooksPendingDeletionSuccessful)
sc.setOperationPhase(common.OperationSucceeded, "successfully synced (no more tasks)")

// Invalidate cache after successful sync
if sc.cacheInvalidationCallback != nil {
modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes))
for _, result := range sc.syncRes {
modifiedResources = append(modifiedResources, result.ResourceKey)
}
sc.cacheInvalidationCallback(modifiedResources)
}
return
}

Expand Down Expand Up @@ -593,11 +614,34 @@ func (sc *syncContext) Sync() {
syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
sc.deleteHooks(hooksPendingDeletionFailed)
sc.setOperationFailed(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")

// Invalidate cache for successfully synced resources even if overall operation failed
if sc.cacheInvalidationCallback != nil {
var modifiedResources []kubeutil.ResourceKey
for _, result := range sc.syncRes {
// Only invalidate resources that were successfully synced
if result.Status == common.ResultCodeSynced {
modifiedResources = append(modifiedResources, result.ResourceKey)
}
}
if len(modifiedResources) > 0 {
sc.cacheInvalidationCallback(modifiedResources)
}
}
case successful:
if remainingTasks.Len() == 0 {
// delete all completed hooks which have appropriate delete policy
sc.deleteHooks(hooksPendingDeletionSuccessful)
sc.setOperationPhase(common.OperationSucceeded, "successfully synced (all tasks run)")

// Invalidate cache after successful sync
if sc.cacheInvalidationCallback != nil {
modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes))
for _, result := range sc.syncRes {
modifiedResources = append(modifiedResources, result.ResourceKey)
}
sc.cacheInvalidationCallback(modifiedResources)
}
} else {
sc.setRunningPhase(remainingTasks, false)
}
Expand Down
Loading