From 40f066a3a1c173b1f8c6137c8f3c01d7eebbc8b6 Mon Sep 17 00:00:00 2001 From: Jenny Shu <28537278+jenshu@users.noreply.github.com> Date: Thu, 8 Dec 2022 09:46:12 -0500 Subject: [PATCH] cluster watcher namespaces (#397) * cluster with namespace * pass in namespace instead * list * remove extra logs * cl * add arg * add tests --- .../v0.25.0/secrets-watch-namespaces.yaml | 6 + codegen/render/kube_multicluster_test.go | 4 +- pkg/bootstrap/bootstrap.go | 4 +- pkg/multicluster/kubeconfig/predicate.go | 68 +++++--- pkg/multicluster/kubeconfig/predicate_test.go | 146 ++++++++++++++++++ pkg/multicluster/watch/watcher.go | 27 ++-- 6 files changed, 221 insertions(+), 34 deletions(-) create mode 100644 changelog/v0.25.0/secrets-watch-namespaces.yaml create mode 100644 pkg/multicluster/kubeconfig/predicate_test.go diff --git a/changelog/v0.25.0/secrets-watch-namespaces.yaml b/changelog/v0.25.0/secrets-watch-namespaces.yaml new file mode 100644 index 000000000..6d2d4a6ab --- /dev/null +++ b/changelog/v0.25.0/secrets-watch-namespaces.yaml @@ -0,0 +1,6 @@ +changelog: + - type: BREAKING_CHANGE + description: > + NewClusterWatcher now takes in a third argument (`watchNamespaces`) to constrain the namespaces in which + secrets will be watched. If empty, then all namespaces will be watched (which is the existing behavior). + issueLink: https://github.com/solo-io/skv2/issues/399 diff --git a/codegen/render/kube_multicluster_test.go b/codegen/render/kube_multicluster_test.go index a07401f35..1ad174611 100644 --- a/codegen/render/kube_multicluster_test.go +++ b/codegen/render/kube_multicluster_test.go @@ -120,7 +120,7 @@ var _ = WithRemoteClusterContextDescribe("Multicluster", func() { Describe("clientset", func() { It("works", func() { - cw := watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns}) + cw := watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns}, nil) err := cw.Run(masterManager) Expect(err).NotTo(HaveOccurred()) mcClientset := multicluster.NewClient(cw) @@ -187,7 +187,7 @@ var _ = WithRemoteClusterContextDescribe("Multicluster", func() { } BeforeEach(func() { - cw = watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns}) + cw = watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns}, nil) }) It("works when a loop is registered before the watcher is started", func() { diff --git a/pkg/bootstrap/bootstrap.go b/pkg/bootstrap/bootstrap.go index f44aa5cb8..8d88be80f 100644 --- a/pkg/bootstrap/bootstrap.go +++ b/pkg/bootstrap/bootstrap.go @@ -155,10 +155,12 @@ func StartMulti( if !localMode { // construct multicluster watcher and client clusterWatcher = watch.NewClusterWatcher( - ctx, manager.Options{ + ctx, + manager.Options{ Namespace: "", // TODO (ilackarms): support configuring specific watch namespaces on remote clusters Scheme: mgr.GetScheme(), }, + nil, ) mcClient = multicluster.NewClient(clusterWatcher) diff --git a/pkg/multicluster/kubeconfig/predicate.go b/pkg/multicluster/kubeconfig/predicate.go index 27845d656..3378b4dd6 100644 --- a/pkg/multicluster/kubeconfig/predicate.go +++ b/pkg/multicluster/kubeconfig/predicate.go @@ -3,31 +3,44 @@ package kubeconfig import ( "reflect" - "sigs.k8s.io/controller-runtime/pkg/client" - corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" ) -var Predicate = predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return isKubeConfigSecret(e.Object) - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return isKubeConfigSecret(e.Object) - }, - UpdateFunc: func(e event.UpdateEvent) bool { - if isKubeConfigSecret(e.ObjectNew) { - // Ignore metadata changes. - oldSecret, newSecret := e.ObjectOld.(*corev1.Secret), e.ObjectNew.(*corev1.Secret) - return !reflect.DeepEqual(oldSecret.Data, newSecret.Data) - } - return false - }, - GenericFunc: func(e event.GenericEvent) bool { - return isKubeConfigSecret(e.Object) - }, +func BuildPredicate(watchNamespaces []string) predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + if !isNamespaceWatched(watchNamespaces, e.Object.GetNamespace()) { + return false + } + return isKubeConfigSecret(e.Object) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + if !isNamespaceWatched(watchNamespaces, e.Object.GetNamespace()) { + return false + } + return isKubeConfigSecret(e.Object) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + if !isNamespaceWatched(watchNamespaces, e.ObjectNew.GetNamespace()) { + return false + } + if isKubeConfigSecret(e.ObjectNew) { + // Ignore metadata changes. + oldSecret, newSecret := e.ObjectOld.(*corev1.Secret), e.ObjectNew.(*corev1.Secret) + return !reflect.DeepEqual(oldSecret.Data, newSecret.Data) + } + return false + }, + GenericFunc: func(e event.GenericEvent) bool { + if !isNamespaceWatched(watchNamespaces, e.Object.GetNamespace()) { + return false + } + return isKubeConfigSecret(e.Object) + }, + } } func isKubeConfigSecret(obj client.Object) bool { @@ -36,3 +49,18 @@ func isKubeConfigSecret(obj client.Object) bool { } return false } + +// If watchNamespaces is empty, then watch all namespaces. Otherwise, watch +// only the specified namespaces. +func isNamespaceWatched(watchNamespaces []string, namespace string) bool { + if len(watchNamespaces) == 0 { + return true + } + + for _, ns := range watchNamespaces { + if ns == namespace { + return true + } + } + return false +} diff --git a/pkg/multicluster/kubeconfig/predicate_test.go b/pkg/multicluster/kubeconfig/predicate_test.go new file mode 100644 index 000000000..dff777f88 --- /dev/null +++ b/pkg/multicluster/kubeconfig/predicate_test.go @@ -0,0 +1,146 @@ +package kubeconfig_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + . "github.com/solo-io/skv2/pkg/multicluster/kubeconfig" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +var _ = Describe("KubeConfig Secret Predicate", func() { + Describe("BuildPredicate", func() { + + It("does not process events for non-secrets", func() { + watchNamespaces := []string{} + pred := BuildPredicate(watchNamespaces) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "configmap1", + Namespace: "ns1", + }, + Data: map[string]string{ + "a": "b", + }, + } + Expect(pred.CreateFunc(event.CreateEvent{Object: obj})).To(BeFalse()) + Expect(pred.DeleteFunc(event.DeleteEvent{Object: obj})).To(BeFalse()) + Expect(pred.UpdateFunc(event.UpdateEvent{ObjectNew: obj})).To(BeFalse()) + Expect(pred.GenericFunc(event.GenericEvent{Object: obj})).To(BeFalse()) + }) + + It("does not process events for non-kubeconfig secrets", func() { + watchNamespaces := []string{} + pred := BuildPredicate(watchNamespaces) + + obj := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret1", + Namespace: "ns1", + }, + Type: corev1.SecretTypeOpaque, + } + Expect(pred.CreateFunc(event.CreateEvent{Object: obj})).To(BeFalse()) + Expect(pred.DeleteFunc(event.DeleteEvent{Object: obj})).To(BeFalse()) + Expect(pred.UpdateFunc(event.UpdateEvent{ObjectNew: obj})).To(BeFalse()) + Expect(pred.GenericFunc(event.GenericEvent{Object: obj})).To(BeFalse()) + }) + + It("processes events in all namespaces when watchNamespaces is empty", func() { + watchNamespaces := []string{} + pred := BuildPredicate(watchNamespaces) + + objOld := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret1", + Namespace: "ns1", + }, + Type: SecretType, + Data: map[string][]byte{ + "a": []byte("b"), + }, + } + objNew := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret1", + Namespace: "ns1", + }, + Type: SecretType, + Data: map[string][]byte{ + "c": []byte("d"), + }, + } + Expect(pred.CreateFunc(event.CreateEvent{Object: objNew})).To(BeTrue()) + Expect(pred.DeleteFunc(event.DeleteEvent{Object: objNew})).To(BeTrue()) + Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeTrue()) + Expect(pred.GenericFunc(event.GenericEvent{Object: objNew})).To(BeTrue()) + }) + + It("processes events only in watchNamespaces", func() { + watchNamespaces := []string{"ns1", "ns2"} + pred := BuildPredicate(watchNamespaces) + + objOld := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret1", + Namespace: "ns2", + }, + Type: SecretType, + Data: map[string][]byte{ + "a": []byte("b"), + }, + } + objNew := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret1", + Namespace: "ns2", + }, + Type: SecretType, + Data: map[string][]byte{ + "c": []byte("d"), + }, + } + Expect(pred.CreateFunc(event.CreateEvent{Object: objNew})).To(BeTrue()) + Expect(pred.DeleteFunc(event.DeleteEvent{Object: objNew})).To(BeTrue()) + Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeTrue()) + Expect(pred.GenericFunc(event.GenericEvent{Object: objNew})).To(BeTrue()) + + // change to a non-watched namespace + objOld.ObjectMeta.Namespace = "ns3" + objNew.ObjectMeta.Namespace = "ns3" + Expect(pred.CreateFunc(event.CreateEvent{Object: objNew})).To(BeFalse()) + Expect(pred.DeleteFunc(event.DeleteEvent{Object: objNew})).To(BeFalse()) + Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeFalse()) + Expect(pred.GenericFunc(event.GenericEvent{Object: objNew})).To(BeFalse()) + }) + + It("does not process update events when secret didn't change", func() { + watchNamespaces := []string{} + pred := BuildPredicate(watchNamespaces) + + objOld := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret1", + Namespace: "ns1", + }, + Type: SecretType, + Data: map[string][]byte{ + "a": []byte("b"), + }, + } + objNew := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret1", + Namespace: "ns1", + }, + Type: SecretType, + Data: map[string][]byte{ + "a": []byte("b"), + }, + } + Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeFalse()) + }) + }) +}) diff --git a/pkg/multicluster/watch/watcher.go b/pkg/multicluster/watch/watcher.go index 4b8a2b3bc..b8593d867 100644 --- a/pkg/multicluster/watch/watcher.go +++ b/pkg/multicluster/watch/watcher.go @@ -20,29 +20,34 @@ import ( ) type clusterWatcher struct { - ctx context.Context - handlers *handlerList - managers *managerSet - options manager.Options + ctx context.Context + handlers *handlerList + managers *managerSet + options manager.Options + watchNamespaces []string } var _ multicluster.Interface = &clusterWatcher{} -// NewClusterWatcher returns a *clusterWatcher. +// NewClusterWatcher returns a *clusterWatcher, which watches for changes to kubeconfig secrets +// (which contain kubeconfigs for remote clusters). // When ctx is cancelled, all cluster managers started by the clusterWatcher are stopped. // Provided manager.Options are applied to all managers started by the clusterWatcher. -func NewClusterWatcher(ctx context.Context, options manager.Options) *clusterWatcher { +// If watchNamespaces is not empty, only secrets in the given namespaces will be watched. If empty, secrets in +// all namespaces will be watched. +func NewClusterWatcher(ctx context.Context, options manager.Options, watchNamespaces []string) *clusterWatcher { return &clusterWatcher{ - ctx: ctx, - handlers: newHandlerList(), - managers: newManagerSet(), - options: options, + ctx: ctx, + handlers: newHandlerList(), + managers: newManagerSet(), + options: options, + watchNamespaces: watchNamespaces, } } func (c *clusterWatcher) Run(master manager.Manager) error { loop := controller.NewSecretReconcileLoop("cluster watcher", master, reconcile.Options{}) - return loop.RunSecretReconciler(c.ctx, c, kubeconfig.Predicate) + return loop.RunSecretReconciler(c.ctx, c, kubeconfig.BuildPredicate(c.watchNamespaces)) } func (c *clusterWatcher) ReconcileSecret(obj *v1.Secret) (reconcile.Result, error) {