Skip to content

Commit f3bd8e5

Browse files
jpinsonneaujotak
andauthored
NETOBSERV-2184: improve cache & reconcile events (#1517)
* improve cache & reconcile events restore copy hack cleanup function * Fix test timeout often reached * Watch objects on status change --------- Co-authored-by: Joel Takvorian <[email protected]>
1 parent af6d023 commit f3bd8e5

File tree

10 files changed

+250
-31
lines changed

10 files changed

+250
-31
lines changed

.mk/development.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ pprof:
199199
pprof-pf:
200200
@echo -e "\n==> Port-forwarding..."
201201
oc get pods
202-
kubectl port-forward -n $(NAMESPACE) $(shell kubectl get pod -l app=netobserv-operator -o jsonpath="{.items[0].metadata.name}") 6060
202+
kubectl port-forward -n $(NAMESPACE) $(shell kubectl get pod -l app=netobserv-operator -n $(NAMESPACE) -o jsonpath="{.items[0].metadata.name}") 6060
203203

204204
.PHONY: use-test-console
205205
use-test-console:

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ lint: prereqs ## Run linter (golangci-lint).
319319
./bin/golangci-lint-${GOLANGCI_LINT_VERSION} run --timeout 5m ./...
320320

321321
test: envtest ## Run tests.
322-
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverpkg=./... -coverprofile cover.out
322+
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" KUBEBUILDER_CONTROLPLANE_STOP_TIMEOUT="120s" go test ./... -coverpkg=./... -coverprofile cover.out
323323

324324
coverage-report: ## Generate coverage report
325325
go tool cover --func=./cover.out

internal/controller/flowcollector_controller.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@ func Start(ctx context.Context, mgr *manager.Manager) error {
4949
builder := ctrl.NewControllerManagedBy(mgr.Manager).
5050
Named("legacy").
5151
For(&flowslatest.FlowCollector{}, reconcilers.IgnoreStatusChange).
52-
Owns(&appsv1.Deployment{}).
53-
Owns(&appsv1.DaemonSet{}).
54-
Owns(&ascv2.HorizontalPodAutoscaler{}).
55-
Owns(&corev1.Namespace{}).
56-
Owns(&corev1.Service{}).
57-
Owns(&corev1.ServiceAccount{})
52+
Owns(&appsv1.Deployment{}, reconcilers.UpdateOrDeleteOnlyPred).
53+
Owns(&appsv1.DaemonSet{}, reconcilers.UpdateOrDeleteOnlyPred).
54+
Owns(&ascv2.HorizontalPodAutoscaler{}, reconcilers.UpdateOrDeleteOnlyPred).
55+
Owns(&corev1.Namespace{}, reconcilers.UpdateOrDeleteOnlyPred).
56+
Owns(&corev1.Service{}, reconcilers.UpdateOrDeleteOnlyPred).
57+
Owns(&corev1.ServiceAccount{}, reconcilers.UpdateOrDeleteOnlyPred)
5858

5959
if mgr.ClusterInfo.IsOpenShift() {
60-
builder.Owns(&securityv1.SecurityContextConstraints{})
60+
builder.Owns(&securityv1.SecurityContextConstraints{}, reconcilers.UpdateOrDeleteOnlyPred)
6161
}
6262
if mgr.ClusterInfo.HasConsolePlugin() {
63-
builder.Owns(&osv1.ConsolePlugin{})
63+
builder.Owns(&osv1.ConsolePlugin{}, reconcilers.UpdateOrDeleteOnlyPred)
6464
} else {
6565
log.Info("Console not detected: the console plugin is not available")
6666
}

internal/controller/flp/flp_controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ func Start(ctx context.Context, mgr *manager.Manager) error {
4747
builder := ctrl.NewControllerManagedBy(mgr).
4848
For(&flowslatest.FlowCollector{}, reconcilers.IgnoreStatusChange).
4949
Named("flp").
50-
Owns(&appsv1.Deployment{}).
51-
Owns(&appsv1.DaemonSet{}).
52-
Owns(&ascv2.HorizontalPodAutoscaler{}).
53-
Owns(&corev1.Namespace{}).
54-
Owns(&corev1.Service{}).
55-
Owns(&corev1.ServiceAccount{}).
50+
Owns(&appsv1.Deployment{}, reconcilers.UpdateOrDeleteOnlyPred).
51+
Owns(&appsv1.DaemonSet{}, reconcilers.UpdateOrDeleteOnlyPred).
52+
Owns(&ascv2.HorizontalPodAutoscaler{}, reconcilers.UpdateOrDeleteOnlyPred).
53+
Owns(&corev1.Namespace{}, reconcilers.UpdateOrDeleteOnlyPred).
54+
Owns(&corev1.Service{}, reconcilers.UpdateOrDeleteOnlyPred).
55+
Owns(&corev1.ServiceAccount{}, reconcilers.UpdateOrDeleteOnlyPred).
5656
Watches(
5757
&metricslatest.FlowMetric{},
5858
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
@@ -61,6 +61,7 @@ func Start(ctx context.Context, mgr *manager.Manager) error {
6161
}
6262
return []reconcile.Request{}
6363
}),
64+
reconcilers.IgnoreStatusChange,
6465
)
6566

6667
ctrl, err := builder.Build(&r)

internal/controller/monitoring/monitoring_controller.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66

77
corev1 "k8s.io/api/core/v1"
88
"k8s.io/apimachinery/pkg/api/errors"
9+
"k8s.io/apimachinery/pkg/labels"
10+
"k8s.io/apimachinery/pkg/selection"
911
"k8s.io/apimachinery/pkg/types"
1012
ctrl "sigs.k8s.io/controller-runtime"
1113
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -42,7 +44,7 @@ func Start(ctx context.Context, mgr *manager.Manager) error {
4244
return ctrl.NewControllerManagedBy(mgr).
4345
For(&flowslatest.FlowCollector{}, reconcilers.IgnoreStatusChange).
4446
Named("monitoring").
45-
Owns(&corev1.Namespace{}).
47+
Owns(&corev1.Namespace{}, reconcilers.UpdateOrDeleteOnlyPred).
4648
Watches(
4749
&metricslatest.FlowMetric{},
4850
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
@@ -51,6 +53,7 @@ func Start(ctx context.Context, mgr *manager.Manager) error {
5153
}
5254
return []reconcile.Request{}
5355
}),
56+
reconcilers.IgnoreStatusChange,
5457
).
5558
Complete(&r)
5659
}
@@ -129,12 +132,18 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, desired
129132
allMetrics := metrics.MergePredefined(fm.Items, &desired.Spec)
130133
log.WithValues("metrics count", len(allMetrics)).Info("Merged metrics")
131134

135+
req, err := labels.NewRequirement("netobserv-managed", selection.Exists, []string{})
136+
if err != nil {
137+
return r.status.Error("CantQueryRequirement", err)
138+
}
132139
// List existing dashboards
133140
currentDashboards := corev1.ConfigMapList{}
134-
if err := r.Client.List(ctx, &currentDashboards, &client.ListOptions{Namespace: dashboardCMNamespace}); err != nil {
141+
if err := r.Client.List(ctx, &currentDashboards, &client.ListOptions{
142+
Namespace: dashboardCMNamespace,
143+
LabelSelector: labels.NewSelector().Add(*req),
144+
}); err != nil {
135145
return r.status.Error("CantListDashboards", err)
136146
}
137-
filterOwned(&currentDashboards)
138147

139148
// Build desired dashboards
140149
cms := buildFlowMetricsDashboards(allMetrics)
@@ -172,14 +181,6 @@ func getNamespacedFlowsMetric(metrics []metricslatest.FlowMetric) string {
172181
return "netobserv_workload_flows_total"
173182
}
174183

175-
func filterOwned(list *corev1.ConfigMapList) {
176-
for i := len(list.Items) - 1; i >= 0; i-- {
177-
if !helper.IsOwned(&list.Items[i]) {
178-
removeFromList(list, i)
179-
}
180-
}
181-
}
182-
183184
func findAndRemoveConfigMapFromList(list *corev1.ConfigMapList, name string) *corev1.ConfigMap {
184185
for i := len(list.Items) - 1; i >= 0; i-- {
185186
if list.Items[i].Name == name {

internal/controller/networkpolicy/np_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func Start(ctx context.Context, mgr *manager.Manager) error {
3333
return ctrl.NewControllerManagedBy(mgr).
3434
For(&flowslatest.FlowCollector{}, reconcilers.IgnoreStatusChange).
3535
Named("networkPolicy").
36-
Owns(&networkingv1.NetworkPolicy{}).
36+
Owns(&networkingv1.NetworkPolicy{}, reconcilers.UpdateOrDeleteOnlyPred).
3737
Complete(&r)
3838
}
3939

internal/controller/reconcilers/reconcilers.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ var (
3434
DeleteFunc: func(_ event.DeleteEvent) bool { return true },
3535
GenericFunc: func(_ event.GenericEvent) bool { return false },
3636
})
37+
UpdateOrDeleteOnlyPred = builder.WithPredicates(predicate.Funcs{
38+
UpdateFunc: func(e event.UpdateEvent) bool {
39+
// Update only if new object is owned - we want to watch for status changes as well (e.g. to know when a deployment is ready)
40+
return helper.IsOwned(e.ObjectNew)
41+
},
42+
CreateFunc: func(_ event.CreateEvent) bool { return false },
43+
DeleteFunc: func(e event.DeleteEvent) bool {
44+
// Update only if it was owned and confirmed as deleted by the api server
45+
return helper.IsOwned(e.Object) && !e.DeleteStateUnknown
46+
},
47+
GenericFunc: func(_ event.GenericEvent) bool { return false },
48+
})
3749
)
3850

3951
func ReconcileClusterRoleBinding(ctx context.Context, cl *helper.Client, desired *rbacv1.ClusterRoleBinding) error {

internal/pkg/manager/manager.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,21 @@ func NewManager(
6464
log := log.FromContext(ctx)
6565
log.Info("Creating manager")
6666

67-
narrowCache := narrowcache.NewConfig(kcfg, narrowcache.ConfigMaps, narrowcache.Secrets)
67+
narrowCache := narrowcache.NewConfig(kcfg,
68+
narrowcache.ConfigMaps,
69+
narrowcache.ClusterRoles,
70+
narrowcache.ClusterRoleBindings,
71+
narrowcache.Daemonsets,
72+
narrowcache.Deployments,
73+
narrowcache.HorizontalPodAutoscalers,
74+
narrowcache.Namespaces,
75+
narrowcache.NetworkPolicies,
76+
narrowcache.Roles,
77+
narrowcache.RoleBindings,
78+
narrowcache.Secrets,
79+
narrowcache.Services,
80+
narrowcache.ServiceAccounts,
81+
)
6882
opts.Client = client.Options{Cache: narrowCache.ControllerRuntimeClientCacheOptions()}
6983

7084
internalManager, err := ctrl.NewManager(kcfg, *opts)

internal/pkg/narrowcache/client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"k8s.io/apimachinery/pkg/watch"
1414
"k8s.io/client-go/kubernetes"
1515
"k8s.io/client-go/util/workqueue"
16-
1716
"sigs.k8s.io/controller-runtime/pkg/client"
1817
"sigs.k8s.io/controller-runtime/pkg/event"
1918
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -92,7 +91,8 @@ func (c *Client) getAndCreateWatchIfNeeded(ctx context.Context, info GVKInfo, gv
9291
}
9392

9493
// Store fetched object
95-
err = c.setToCache(objKey, fetched)
94+
obj := info.Cleanup(fetched)
95+
err = c.setToCache(objKey, obj)
9696
if err != nil {
9797
return nil, objKey, err
9898
}
@@ -142,6 +142,7 @@ func (c *Client) setToCache(key string, obj runtime.Object) error {
142142
if !ok {
143143
return fmt.Errorf("could not convert runtime.Object to client.Object")
144144
}
145+
145146
c.wmut.Lock()
146147
defer c.wmut.Unlock()
147148
if ca := c.watchedObjects[key]; ca != nil {

0 commit comments

Comments
 (0)