Skip to content

Commit cd063ab

Browse files
committed
fix the issue that the relevant fields in rb and pp are inconsistent
Signed-off-by: zhzhuang-zju <[email protected]>
1 parent 6639cdc commit cd063ab

File tree

3 files changed

+152
-15
lines changed

3 files changed

+152
-15
lines changed

pkg/detector/detector.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,26 @@ func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) {
337337
return
338338
}
339339

340-
resourceChangeByKarmada := eventfilter.ResourceChangeByKarmada(unstructuredOldObj, unstructuredNewObj)
340+
isLazyActivation, err := d.isClaimedByLazyPolicy(unstructuredNewObj)
341+
if err != nil {
342+
// should never come here
343+
klog.Errorf("Failed to check if the object (kind=%s, %s/%s) is bound by lazy policy. err: %v", unstructuredNewObj.GetKind(), unstructuredNewObj.GetNamespace(), unstructuredNewObj.GetName(), err)
344+
}
341345

342-
resourceItem := ResourceItem{
343-
Obj: newRuntimeObj,
344-
ResourceChangeByKarmada: resourceChangeByKarmada,
346+
if isLazyActivation {
347+
resourceItem := ResourceItem{
348+
Obj: newRuntimeObj,
349+
ResourceChangeByKarmada: eventfilter.ResourceChangeByKarmada(unstructuredOldObj, unstructuredNewObj),
350+
}
351+
352+
d.Processor.Enqueue(resourceItem)
353+
return
345354
}
346355

356+
// For non-lazy policies, it is no need to distinguish whether the change is from Karmada or not.
357+
resourceItem := ResourceItem{
358+
Obj: newRuntimeObj,
359+
}
347360
d.Processor.Enqueue(resourceItem)
348361
}
349362

@@ -1278,7 +1291,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
12781291
if err != nil {
12791292
return err
12801293
}
1281-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1294+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
12821295
}
12831296

12841297
// check whether there are matched RT in waiting list, is so, add it to processor
@@ -1296,7 +1309,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
12961309

12971310
for _, key := range matchedKeys {
12981311
d.RemoveWaiting(key)
1299-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1312+
d.enqueueResourceTemplateForPolicyChange(key, policy.Spec.ActivationPreference)
13001313
}
13011314

13021315
// If preemption is enabled, handle the preemption process.
@@ -1345,14 +1358,14 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
13451358
if err != nil {
13461359
return err
13471360
}
1348-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1361+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
13491362
}
13501363
for _, crb := range clusterResourceBindings.Items {
13511364
resourceKey, err := helper.ConstructClusterWideKey(crb.Spec.Resource)
13521365
if err != nil {
13531366
return err
13541367
}
1355-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1368+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
13561369
}
13571370

13581371
matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
@@ -1369,7 +1382,7 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
13691382

13701383
for _, key := range matchedKeys {
13711384
d.RemoveWaiting(key)
1372-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1385+
d.enqueueResourceTemplateForPolicyChange(key, policy.Spec.ActivationPreference)
13731386
}
13741387

13751388
// If preemption is enabled, handle the preemption process.
@@ -1512,3 +1525,21 @@ func (d *ResourceDetector) CleanupClusterResourceBindingClaimMetadata(crbName st
15121525
return updateErr
15131526
})
15141527
}
1528+
1529+
// enqueueResourceTemplateForPolicyChange enqueues a resource template key for reconciliation in response to a
1530+
// PropagationPolicy or ClusterPropagationPolicy change. If the policy's ActivationPreference is set to Lazy,
1531+
// the ResourceChangeByKarmada flag is set to true, indicating that the resource template is being enqueued
1532+
// due to a policy change and should not be propagated to member clusters. For non-lazy policies, this flag
1533+
// is omitted as the distinction is unnecessary.
1534+
//
1535+
// Note: Setting ResourceChangeByKarmada changes the effective queue key. Mixing both true/false for the same
1536+
// resource may result in two different queue keys being processed concurrently, which can cause race conditions.
1537+
// Therefore, only set ResourceChangeByKarmada in lazy activation mode.
1538+
// For more details, see: https://github.com/karmada-io/karmada/issues/5996.
1539+
func (d *ResourceDetector) enqueueResourceTemplateForPolicyChange(key keys.ClusterWideKey, pref policyv1alpha1.ActivationPreference) {
1540+
if util.IsLazyActivationEnabled(pref) {
1541+
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1542+
return
1543+
}
1544+
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key})
1545+
}

pkg/detector/detector_test.go

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,6 @@ func TestOnUpdate(t *testing.T) {
431431
oldObj interface{}
432432
newObj interface{}
433433
expectedEnqueue bool
434-
expectedChangeByKarmada bool
435434
expectToUnstructuredError bool
436435
}{
437436
{
@@ -462,8 +461,7 @@ func TestOnUpdate(t *testing.T) {
462461
},
463462
},
464463
},
465-
expectedEnqueue: true,
466-
expectedChangeByKarmada: false,
464+
expectedEnqueue: true,
467465
},
468466
{
469467
name: "update without changes",
@@ -526,8 +524,7 @@ func TestOnUpdate(t *testing.T) {
526524
},
527525
},
528526
},
529-
expectedEnqueue: true,
530-
expectedChangeByKarmada: true,
527+
expectedEnqueue: true,
531528
},
532529
{
533530
name: "core v1 object",
@@ -575,7 +572,6 @@ func TestOnUpdate(t *testing.T) {
575572
assert.IsType(t, ResourceItem{}, mockProcessor.lastEnqueued, "Enqueued item should be of type ResourceItem")
576573
enqueued := mockProcessor.lastEnqueued.(ResourceItem)
577574
assert.Equal(t, tt.newObj, enqueued.Obj, "Enqueued object should match the new object")
578-
assert.Equal(t, tt.expectedChangeByKarmada, enqueued.ResourceChangeByKarmada, "ResourceChangeByKarmada flag should match expected value")
579575
} else {
580576
assert.Equal(t, 0, mockProcessor.enqueueCount, "Object should not be enqueued")
581577
}
@@ -973,6 +969,71 @@ func TestApplyClusterPolicy(t *testing.T) {
973969
}
974970
}
975971

972+
func TestEnqueueResourceKeyWithActivationPref(t *testing.T) {
973+
testClusterWideKey := keys.ClusterWideKey{
974+
Group: "foo",
975+
Version: "foo",
976+
Kind: "foo",
977+
Namespace: "foo",
978+
Name: "foo",
979+
}
980+
tests := []struct {
981+
name string
982+
key keys.ClusterWideKey
983+
pref policyv1alpha1.ActivationPreference
984+
want keys.ClusterWideKeyWithConfig
985+
}{
986+
{
987+
name: "lazy pp and resourceChangeByKarmada is true",
988+
key: testClusterWideKey,
989+
pref: policyv1alpha1.LazyActivation,
990+
want: keys.ClusterWideKeyWithConfig{
991+
ClusterWideKey: testClusterWideKey,
992+
ResourceChangeByKarmada: true,
993+
},
994+
},
995+
{
996+
name: "non-lazy ignores ResourceChangeByKarmada",
997+
key: testClusterWideKey,
998+
pref: "",
999+
want: keys.ClusterWideKeyWithConfig{
1000+
ClusterWideKey: testClusterWideKey,
1001+
ResourceChangeByKarmada: false,
1002+
},
1003+
},
1004+
}
1005+
for _, tt := range tests {
1006+
t.Run(tt.name, func(t *testing.T) {
1007+
ctx, cancel := context.WithCancel(context.Background())
1008+
detector := ResourceDetector{
1009+
Processor: util.NewAsyncWorker(util.Options{
1010+
Name: "resource detector",
1011+
KeyFunc: ResourceItemKeyFunc,
1012+
ReconcileFunc: func(key util.QueueKey) (err error) {
1013+
defer cancel()
1014+
defer func() {
1015+
assert.NoError(t, err)
1016+
}()
1017+
clusterWideKeyWithConfig, ok := key.(keys.ClusterWideKeyWithConfig)
1018+
if !ok {
1019+
err = fmt.Errorf("invalid key")
1020+
return err
1021+
}
1022+
if clusterWideKeyWithConfig != tt.want {
1023+
err = fmt.Errorf("unexpected key. want:%+v, got:%+v", tt.want, clusterWideKeyWithConfig)
1024+
return err
1025+
}
1026+
return nil
1027+
},
1028+
}),
1029+
}
1030+
detector.Processor.Run(1, ctx.Done())
1031+
detector.enqueueResourceTemplateForPolicyChange(tt.key, tt.pref)
1032+
<-ctx.Done()
1033+
})
1034+
}
1035+
}
1036+
9761037
// Helper Functions
9771038

9781039
// setupTestScheme creates a runtime scheme with necessary types for testing

pkg/detector/policy.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,51 @@ func (d *ResourceDetector) listCPPDerivedCRBs(policyID, policyName string) (*wor
373373
return bindings, nil
374374
}
375375

376+
func (d *ResourceDetector) isClaimedByLazyPolicy(obj *unstructured.Unstructured) (bool, error) {
377+
policyAnnotations := obj.GetAnnotations()
378+
policyLabels := obj.GetLabels()
379+
policyNamespace := util.GetAnnotationValue(policyAnnotations, policyv1alpha1.PropagationPolicyNamespaceAnnotation)
380+
policyName := util.GetAnnotationValue(policyAnnotations, policyv1alpha1.PropagationPolicyNameAnnotation)
381+
claimedID := util.GetLabelValue(policyLabels, policyv1alpha1.PropagationPolicyPermanentIDLabel)
382+
if policyNamespace != "" && policyName != "" && claimedID != "" {
383+
policyObject, err := d.propagationPolicyLister.ByNamespace(policyNamespace).Get(policyName)
384+
if err != nil {
385+
if apierrors.IsNotFound(err) {
386+
return false, nil
387+
}
388+
389+
return false, err
390+
}
391+
matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{}
392+
if err = helper.ConvertToTypedObject(policyObject, matchedPropagationPolicy); err != nil {
393+
return false, err
394+
}
395+
396+
return util.IsLazyActivationEnabled(matchedPropagationPolicy.Spec.ActivationPreference), nil
397+
}
398+
399+
policyName = util.GetAnnotationValue(policyAnnotations, policyv1alpha1.ClusterPropagationPolicyAnnotation)
400+
claimedID = util.GetLabelValue(policyLabels, policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel)
401+
if policyName != "" && claimedID != "" {
402+
policyObject, err := d.clusterPropagationPolicyLister.Get(policyName)
403+
if err != nil {
404+
if apierrors.IsNotFound(err) {
405+
return false, nil
406+
}
407+
408+
return false, err
409+
}
410+
matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
411+
if err = helper.ConvertToTypedObject(policyObject, matchedClusterPropagationPolicy); err != nil {
412+
return false, err
413+
}
414+
415+
return util.IsLazyActivationEnabled(matchedClusterPropagationPolicy.Spec.ActivationPreference), nil
416+
}
417+
418+
return false, nil
419+
}
420+
376421
// excludeClusterPolicy excludes cluster propagation policy.
377422
// If propagation policy was claimed, cluster propagation policy should not exist.
378423
func excludeClusterPolicy(obj metav1.Object) (hasClaimedClusterPolicy bool) {

0 commit comments

Comments
 (0)