Skip to content

Commit 72dec28

Browse files
XiShanYongYe-Changliaolecheng
authored andcommitted
ensure resource interpreter cache sync before starting controllers
Signed-off-by: changzhen <[email protected]>
1 parent 06aad71 commit 72dec28

File tree

9 files changed

+142
-44
lines changed

9 files changed

+142
-44
lines changed

cmd/agent/app/agent.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,9 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
260260
sharedFactory.WaitForCacheSync(ctx.Done())
261261

262262
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
263-
if err := mgr.Add(resourceInterpreter); err != nil {
264-
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
263+
if err := resourceInterpreter.Start(ctx); err != nil {
264+
return fmt.Errorf("failed to start resource interpreter: %w", err)
265265
}
266-
267266
rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
268267
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}
269268

cmd/controller-manager/app/controllermanager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -839,8 +839,8 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
839839
sharedFactory.WaitForCacheSync(ctx.Done())
840840

841841
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
842-
if err := mgr.Add(resourceInterpreter); err != nil {
843-
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
842+
if err := resourceInterpreter.Start(ctx); err != nil {
843+
klog.Fatalf("Failed to start resource interpreter: %v", err)
844844
}
845845
rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
846846
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

pkg/resourceinterpreter/customized/declarative/configmanager/manager.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ import (
2727
"k8s.io/klog/v2"
2828

2929
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
30+
"github.com/karmada-io/karmada/pkg/util"
3031
"github.com/karmada-io/karmada/pkg/util/fedinformer"
3132
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
3233
"github.com/karmada-io/karmada/pkg/util/helper"
3334
)
3435

35-
var resourceInterpreterCustomizationsGVR = schema.GroupVersionResource{
36-
Group: configv1alpha1.GroupVersion.Group,
37-
Version: configv1alpha1.GroupVersion.Version,
38-
Resource: "resourceinterpretercustomizations",
39-
}
40-
4136
// ConfigManager can list custom resource interpreter.
4237
type ConfigManager interface {
4338
CustomAccessors() map[schema.GroupVersionKind]CustomAccessor
4439
HasSynced() bool
40+
// LoadConfig is used to load ResourceInterpreterCustomization into the cache,
41+
// it requires the provided customizations to be a full list of objects.
42+
// It is recommended to be called during startup. After called, HasSynced() will always
43+
// return true, and CustomAccessors() will return a map of CustomAccessor containing
44+
// all ResourceInterpreterCustomization configurations.
4545
LoadConfig(customizations []*configv1alpha1.ResourceInterpreterCustomization)
4646
}
4747

@@ -81,12 +81,12 @@ func NewInterpreterConfigManager(informer genericmanager.SingleClusterInformerMa
8181
// In interpret command, rules are not loaded from server, so we don't start informer for it.
8282
if informer != nil {
8383
manager.informer = informer
84-
manager.lister = informer.Lister(resourceInterpreterCustomizationsGVR)
84+
manager.lister = informer.Lister(util.ResourceInterpreterCustomizationsGVR)
8585
configHandlers := fedinformer.NewHandlerOnEvents(
8686
func(_ interface{}) { _ = manager.updateConfiguration() },
8787
func(_, _ interface{}) { _ = manager.updateConfiguration() },
8888
func(_ interface{}) { _ = manager.updateConfiguration() })
89-
informer.ForResource(resourceInterpreterCustomizationsGVR, configHandlers)
89+
informer.ForResource(util.ResourceInterpreterCustomizationsGVR, configHandlers)
9090
}
9191

9292
return manager
@@ -102,7 +102,7 @@ func (configManager *interpreterConfigManager) updateConfiguration() error {
102102
if configManager.informer == nil {
103103
return errors.New("informer manager is not configured")
104104
}
105-
if !configManager.informer.IsInformerSynced(resourceInterpreterCustomizationsGVR) {
105+
if !configManager.informer.IsInformerSynced(util.ResourceInterpreterCustomizationsGVR) {
106106
return errors.New("informer of ResourceInterpreterCustomization not synced")
107107
}
108108

pkg/resourceinterpreter/customized/webhook/configmanager/manager.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,26 @@ import (
2323
"sync/atomic"
2424

2525
"k8s.io/apimachinery/pkg/labels"
26-
"k8s.io/apimachinery/pkg/runtime/schema"
2726
"k8s.io/client-go/tools/cache"
2827
"k8s.io/klog/v2"
2928

3029
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
30+
"github.com/karmada-io/karmada/pkg/util"
3131
"github.com/karmada-io/karmada/pkg/util/fedinformer"
3232
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
3333
"github.com/karmada-io/karmada/pkg/util/helper"
3434
)
3535

36-
var resourceExploringWebhookConfigurationsGVR = schema.GroupVersionResource{
37-
Group: configv1alpha1.GroupVersion.Group,
38-
Version: configv1alpha1.GroupVersion.Version,
39-
Resource: "resourceinterpreterwebhookconfigurations",
40-
}
41-
4236
// ConfigManager can list dynamic webhooks.
4337
type ConfigManager interface {
4438
HookAccessors() []WebhookAccessor
4539
HasSynced() bool
40+
// LoadConfig is used to load ResourceInterpreterWebhookConfiguration into the cache,
41+
// it requires the provided webhookConfigurations to be a full list of objects.
42+
// It is recommended to be called during startup. After called, HasSynced() will always
43+
// return true, and HookAccessors() will return a list of WebhookAccessor containing
44+
// all ResourceInterpreterWebhookConfiguration configurations.
45+
LoadConfig(webhookConfigurations []*configv1alpha1.ResourceInterpreterWebhookConfiguration)
4646
}
4747

4848
// interpreterConfigManager collect the resource interpreter webhook configuration.
@@ -75,7 +75,7 @@ func (m *interpreterConfigManager) HasSynced() bool {
7575
// NewExploreConfigManager return a new interpreterConfigManager with resourceinterpreterwebhookconfigurations handlers.
7676
func NewExploreConfigManager(inform genericmanager.SingleClusterInformerManager) ConfigManager {
7777
manager := &interpreterConfigManager{
78-
lister: inform.Lister(resourceExploringWebhookConfigurationsGVR),
78+
lister: inform.Lister(util.ResourceInterpreterWebhookConfigurationsGVR),
7979
}
8080

8181
manager.configuration.Store([]WebhookAccessor{})
@@ -85,7 +85,7 @@ func NewExploreConfigManager(inform genericmanager.SingleClusterInformerManager)
8585
func(_ interface{}) { _ = manager.updateConfiguration() },
8686
func(_, _ interface{}) { _ = manager.updateConfiguration() },
8787
func(_ interface{}) { _ = manager.updateConfiguration() })
88-
inform.ForResource(resourceExploringWebhookConfigurationsGVR, configHandlers)
88+
inform.ForResource(util.ResourceInterpreterWebhookConfigurationsGVR, configHandlers)
8989

9090
return manager
9191
}
@@ -100,7 +100,7 @@ func (m *interpreterConfigManager) updateConfiguration() error {
100100
if m.informer == nil {
101101
return errors.New("informer manager is not configured")
102102
}
103-
if !m.informer.IsInformerSynced(resourceExploringWebhookConfigurationsGVR) {
103+
if !m.informer.IsInformerSynced(util.ResourceInterpreterWebhookConfigurationsGVR) {
104104
return errors.New("informer of ResourceInterpreterWebhookConfiguration not synced")
105105
}
106106

@@ -109,26 +109,26 @@ func (m *interpreterConfigManager) updateConfiguration() error {
109109
return err
110110
}
111111

112-
configs := make([]*configv1alpha1.ResourceInterpreterWebhookConfiguration, 0)
113-
for _, c := range configurations {
114-
unstructuredConfig, err := helper.ToUnstructured(c)
115-
if err != nil {
116-
return err
117-
}
118-
112+
configs := make([]*configv1alpha1.ResourceInterpreterWebhookConfiguration, len(configurations))
113+
for index, c := range configurations {
119114
config := &configv1alpha1.ResourceInterpreterWebhookConfiguration{}
120-
err = helper.ConvertToTypedObject(unstructuredConfig, config)
115+
err = helper.ConvertToTypedObject(c, config)
121116
if err != nil {
122117
return err
123118
}
124-
configs = append(configs, config)
119+
configs[index] = config
125120
}
126121

127-
m.configuration.Store(mergeResourceExploreWebhookConfigurations(configs))
128-
m.initialSynced.Store(true)
122+
m.LoadConfig(configs)
129123
return nil
130124
}
131125

126+
// LoadConfig loads the webhook configurations and updates the initialSynced flag to true.
127+
func (m *interpreterConfigManager) LoadConfig(webhookConfigurations []*configv1alpha1.ResourceInterpreterWebhookConfiguration) {
128+
m.configuration.Store(mergeResourceExploreWebhookConfigurations(webhookConfigurations))
129+
m.initialSynced.Store(true)
130+
}
131+
132132
func mergeResourceExploreWebhookConfigurations(configurations []*configv1alpha1.ResourceInterpreterWebhookConfiguration) []WebhookAccessor {
133133
sort.SliceStable(configurations, func(i, j int) bool {
134134
return configurations[i].Name < configurations[j].Name

pkg/resourceinterpreter/customized/webhook/configmanager/manager_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
3434
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
35+
"github.com/karmada-io/karmada/pkg/util/helper"
3536
)
3637

3738
func TestNewExploreConfigManager(t *testing.T) {
@@ -136,11 +137,23 @@ func TestHasSynced(t *testing.T) {
136137

137138
for _, tt := range tests {
138139
t.Run(tt.name, func(t *testing.T) {
140+
// Convert typed objects to unstructured objects for proper testing
141+
unstructuredItems := make([]runtime.Object, len(tt.listResult))
142+
for i, config := range tt.listResult {
143+
if config != nil {
144+
unstructuredObj, err := helper.ToUnstructured(config)
145+
assert.NoError(t, err)
146+
unstructuredItems[i] = unstructuredObj
147+
} else {
148+
unstructuredItems[i] = config
149+
}
150+
}
151+
139152
manager := &interpreterConfigManager{
140153
informer: tt.informer,
141154
lister: &mockLister{
142155
err: tt.listErr,
143-
items: tt.listResult,
156+
items: unstructuredItems,
144157
},
145158
}
146159
manager.initialSynced.Store(tt.initialSynced)
@@ -274,9 +287,21 @@ func TestUpdateConfiguration(t *testing.T) {
274287

275288
for _, tt := range tests {
276289
t.Run(tt.name, func(t *testing.T) {
290+
// Convert typed objects to unstructured objects for proper testing
291+
unstructuredItems := make([]runtime.Object, len(tt.configs))
292+
for i, config := range tt.configs {
293+
if config != nil {
294+
unstructuredObj, err := helper.ToUnstructured(config)
295+
assert.NoError(t, err)
296+
unstructuredItems[i] = unstructuredObj
297+
} else {
298+
unstructuredItems[i] = config
299+
}
300+
}
301+
277302
manager := &interpreterConfigManager{
278303
lister: &mockLister{
279-
items: tt.configs,
304+
items: unstructuredItems,
280305
err: tt.listErr,
281306
},
282307
informer: tt.informer,

pkg/resourceinterpreter/customized/webhook/customized.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,8 @@ func (e *CustomizedInterpreter) InterpretHealth(ctx context.Context, attributes
355355

356356
return response.Healthy, matched, nil
357357
}
358+
359+
// LoadConfig loads the webhook configurations.
360+
func (e *CustomizedInterpreter) LoadConfig(webhookConfigurations []*configv1alpha1.ResourceInterpreterWebhookConfiguration) {
361+
e.hookManager.LoadConfig(webhookConfigurations)
362+
}

pkg/resourceinterpreter/customized/webhook/customized_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,12 @@ func (m *mockConfigManager) HookAccessors() []configmanager.WebhookAccessor {
10851085
return m.hooks
10861086
}
10871087

1088+
func (m *mockConfigManager) LoadConfig(_ []*configv1alpha1.ResourceInterpreterWebhookConfiguration) {
1089+
// Mock implementation: in a real test, we might want to process the configurations
1090+
// and update the hooks accordingly. For now, this is a no-op implementation.
1091+
// This allows the mock to satisfy the ConfigManager interface.
1092+
}
1093+
10881094
// mockWebhookAccessor implements configmanager.WebhookAccessor interface for testing
10891095
type mockWebhookAccessor struct {
10901096
uid string

pkg/resourceinterpreter/interpreter.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121

2222
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
23+
"k8s.io/apimachinery/pkg/labels"
2324
"k8s.io/apimachinery/pkg/runtime"
2425
"k8s.io/apimachinery/pkg/runtime/schema"
2526
corev1 "k8s.io/client-go/listers/core/v1"
@@ -32,12 +33,14 @@ import (
3233
"github.com/karmada-io/karmada/pkg/resourceinterpreter/customized/webhook/request"
3334
"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native"
3435
"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/thirdparty"
36+
"github.com/karmada-io/karmada/pkg/util"
3537
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
38+
"github.com/karmada-io/karmada/pkg/util/helper"
3639
)
3740

3841
// ResourceInterpreter manages both default and customized webhooks to interpret custom resource structure.
3942
type ResourceInterpreter interface {
40-
// Start starts running the component and will never stop running until the context is closed or an error occurs.
43+
// Start initializes the resource interpreter and performs cache synchronization.
4144
Start(ctx context.Context) (err error)
4245

4346
// HookEnabled tells if any hook exist for specific resource type and operation.
@@ -85,13 +88,16 @@ type customResourceInterpreterImpl struct {
8588
defaultInterpreter *native.DefaultInterpreter
8689
}
8790

88-
// Start starts running the component and will never stop running until the context is closed or an error occurs.
89-
func (i *customResourceInterpreterImpl) Start(ctx context.Context) (err error) {
90-
klog.Infof("Starting custom resource interpreter.")
91+
// Start initializes all interpreters and load all ResourceInterpreterCustomization and
92+
// ResourceInterpreterWebhookConfiguration configurations into the cache.
93+
// It is recommended to be called before all controllers. After called, the resource interpreter
94+
// will be ready to interpret custom resources.
95+
func (i *customResourceInterpreterImpl) Start(_ context.Context) (err error) {
96+
klog.Infoln("Starting resource interpreter.")
9197

9298
i.customizedInterpreter, err = webhook.NewCustomizedInterpreter(i.informer, i.serviceLister)
9399
if err != nil {
94-
return
100+
return err
95101
}
96102
i.configurableInterpreter = declarative.NewConfigurableInterpreter(i.informer)
97103

@@ -100,8 +106,12 @@ func (i *customResourceInterpreterImpl) Start(ctx context.Context) (err error) {
100106

101107
i.informer.Start()
102108
i.informer.WaitForCacheSync()
103-
<-ctx.Done()
104-
klog.Infof("Stopped as context canceled.")
109+
110+
if err = i.loadConfig(); err != nil {
111+
return err
112+
}
113+
114+
klog.Infoln("Resource interpreter started.")
105115
return nil
106116
}
107117

@@ -339,3 +349,46 @@ func (i *customResourceInterpreterImpl) InterpretHealth(object *unstructured.Uns
339349
healthy, err = i.defaultInterpreter.InterpretHealth(object)
340350
return
341351
}
352+
353+
// loadConfig loads the full set of ResourceInterpreterCustomization and
354+
// ResourceInterpreterWebhookConfiguration configurations into the cache. It avoids resource interpreter
355+
// parsing errors when the resource interpreter starts and the cache is not synchronized.
356+
func (i *customResourceInterpreterImpl) loadConfig() error {
357+
customizations, err := i.informer.Lister(util.ResourceInterpreterCustomizationsGVR).List(labels.Everything())
358+
if err != nil {
359+
klog.Errorf("Failed to list resourceinterpretercustomizations: %v", err)
360+
return err
361+
}
362+
klog.V(5).Infof("Found %d resourceinterpretercustomizations", len(customizations))
363+
364+
declareConfigs := make([]*configv1alpha1.ResourceInterpreterCustomization, len(customizations))
365+
for index, c := range customizations {
366+
config := &configv1alpha1.ResourceInterpreterCustomization{}
367+
if err = helper.ConvertToTypedObject(c, config); err != nil {
368+
klog.Errorf("Failed to convert resourceinterpretercustomization: %v", err)
369+
return err
370+
}
371+
declareConfigs[index] = config
372+
}
373+
i.configurableInterpreter.LoadConfig(declareConfigs)
374+
375+
webhooks, err := i.informer.Lister(util.ResourceInterpreterWebhookConfigurationsGVR).List(labels.Everything())
376+
if err != nil {
377+
klog.Errorf("Failed to list resourceinterpreterwebhookconfigurations: %v", err)
378+
return err
379+
}
380+
klog.V(5).Infof("Found %d resourceinterpreterwebhookconfigurations", len(webhooks))
381+
382+
webhookConfigs := make([]*configv1alpha1.ResourceInterpreterWebhookConfiguration, len(webhooks))
383+
for index, c := range webhooks {
384+
config := &configv1alpha1.ResourceInterpreterWebhookConfiguration{}
385+
if err = helper.ConvertToTypedObject(c, config); err != nil {
386+
klog.Errorf("Failed to convert resourceinterpreterwebhookconfiguration: %v", err)
387+
return err
388+
}
389+
webhookConfigs[index] = config
390+
}
391+
i.customizedInterpreter.LoadConfig(webhookConfigs)
392+
393+
return nil
394+
}

pkg/util/constants.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
discoveryv1 "k8s.io/api/discovery/v1"
2323

24+
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
2425
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
2526
)
2627

@@ -249,6 +250,15 @@ var (
249250
EndpointSliceGVK = discoveryv1.SchemeGroupVersion.WithKind("EndpointSlice")
250251
)
251252

253+
// Define resource group version resource.
254+
var (
255+
// ResourceInterpreterCustomizationsGVR is the GroupVersionResource of ResourceInterpreterCustomizations.
256+
ResourceInterpreterCustomizationsGVR = configv1alpha1.SchemeGroupVersion.WithResource("resourceinterpretercustomizations")
257+
258+
// ResourceInterpreterWebhookConfigurationsGVR is the GroupVersionResource of ResourceInterpreterWebhookConfigurations.
259+
ResourceInterpreterWebhookConfigurationsGVR = configv1alpha1.SchemeGroupVersion.WithResource("resourceinterpreterwebhookconfigurations")
260+
)
261+
252262
const (
253263
// DefaultFilePerm default file perm
254264
DefaultFilePerm = 0640

0 commit comments

Comments
 (0)