Skip to content

Commit

Permalink
cluster watcher namespaces (#397)
Browse files Browse the repository at this point in the history
* cluster with namespace

* pass in namespace instead

* list

* remove extra logs

* cl

* add arg

* add tests
  • Loading branch information
jenshu authored Dec 8, 2022
1 parent a8ae26d commit 40f066a
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 34 deletions.
6 changes: 6 additions & 0 deletions changelog/v0.25.0/secrets-watch-namespaces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
changelog:
- type: BREAKING_CHANGE
description: >
NewClusterWatcher now takes in a third argument (`watchNamespaces`) to constrain the namespaces in which
secrets will be watched. If empty, then all namespaces will be watched (which is the existing behavior).
issueLink: https://github.com/solo-io/skv2/issues/399
4 changes: 2 additions & 2 deletions codegen/render/kube_multicluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ var _ = WithRemoteClusterContextDescribe("Multicluster", func() {

Describe("clientset", func() {
It("works", func() {
cw := watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns})
cw := watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns}, nil)
err := cw.Run(masterManager)
Expect(err).NotTo(HaveOccurred())
mcClientset := multicluster.NewClient(cw)
Expand Down Expand Up @@ -187,7 +187,7 @@ var _ = WithRemoteClusterContextDescribe("Multicluster", func() {
}

BeforeEach(func() {
cw = watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns})
cw = watch.NewClusterWatcher(ctx, manager.Options{Namespace: ns}, nil)
})

It("works when a loop is registered before the watcher is started", func() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@ func StartMulti(
if !localMode {
// construct multicluster watcher and client
clusterWatcher = watch.NewClusterWatcher(
ctx, manager.Options{
ctx,
manager.Options{
Namespace: "", // TODO (ilackarms): support configuring specific watch namespaces on remote clusters
Scheme: mgr.GetScheme(),
},
nil,
)

mcClient = multicluster.NewClient(clusterWatcher)
Expand Down
68 changes: 48 additions & 20 deletions pkg/multicluster/kubeconfig/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,44 @@ package kubeconfig
import (
"reflect"

"sigs.k8s.io/controller-runtime/pkg/client"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var Predicate = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return isKubeConfigSecret(e.Object)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return isKubeConfigSecret(e.Object)
},
UpdateFunc: func(e event.UpdateEvent) bool {
if isKubeConfigSecret(e.ObjectNew) {
// Ignore metadata changes.
oldSecret, newSecret := e.ObjectOld.(*corev1.Secret), e.ObjectNew.(*corev1.Secret)
return !reflect.DeepEqual(oldSecret.Data, newSecret.Data)
}
return false
},
GenericFunc: func(e event.GenericEvent) bool {
return isKubeConfigSecret(e.Object)
},
func BuildPredicate(watchNamespaces []string) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if !isNamespaceWatched(watchNamespaces, e.Object.GetNamespace()) {
return false
}
return isKubeConfigSecret(e.Object)
},
DeleteFunc: func(e event.DeleteEvent) bool {
if !isNamespaceWatched(watchNamespaces, e.Object.GetNamespace()) {
return false
}
return isKubeConfigSecret(e.Object)
},
UpdateFunc: func(e event.UpdateEvent) bool {
if !isNamespaceWatched(watchNamespaces, e.ObjectNew.GetNamespace()) {
return false
}
if isKubeConfigSecret(e.ObjectNew) {
// Ignore metadata changes.
oldSecret, newSecret := e.ObjectOld.(*corev1.Secret), e.ObjectNew.(*corev1.Secret)
return !reflect.DeepEqual(oldSecret.Data, newSecret.Data)
}
return false
},
GenericFunc: func(e event.GenericEvent) bool {
if !isNamespaceWatched(watchNamespaces, e.Object.GetNamespace()) {
return false
}
return isKubeConfigSecret(e.Object)
},
}
}

func isKubeConfigSecret(obj client.Object) bool {
Expand All @@ -36,3 +49,18 @@ func isKubeConfigSecret(obj client.Object) bool {
}
return false
}

// If watchNamespaces is empty, then watch all namespaces. Otherwise, watch
// only the specified namespaces.
func isNamespaceWatched(watchNamespaces []string, namespace string) bool {
if len(watchNamespaces) == 0 {
return true
}

for _, ns := range watchNamespaces {
if ns == namespace {
return true
}
}
return false
}
146 changes: 146 additions & 0 deletions pkg/multicluster/kubeconfig/predicate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package kubeconfig_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/solo-io/skv2/pkg/multicluster/kubeconfig"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
)

var _ = Describe("KubeConfig Secret Predicate", func() {
Describe("BuildPredicate", func() {

It("does not process events for non-secrets", func() {
watchNamespaces := []string{}
pred := BuildPredicate(watchNamespaces)

obj := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "configmap1",
Namespace: "ns1",
},
Data: map[string]string{
"a": "b",
},
}
Expect(pred.CreateFunc(event.CreateEvent{Object: obj})).To(BeFalse())
Expect(pred.DeleteFunc(event.DeleteEvent{Object: obj})).To(BeFalse())
Expect(pred.UpdateFunc(event.UpdateEvent{ObjectNew: obj})).To(BeFalse())
Expect(pred.GenericFunc(event.GenericEvent{Object: obj})).To(BeFalse())
})

It("does not process events for non-kubeconfig secrets", func() {
watchNamespaces := []string{}
pred := BuildPredicate(watchNamespaces)

obj := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret1",
Namespace: "ns1",
},
Type: corev1.SecretTypeOpaque,
}
Expect(pred.CreateFunc(event.CreateEvent{Object: obj})).To(BeFalse())
Expect(pred.DeleteFunc(event.DeleteEvent{Object: obj})).To(BeFalse())
Expect(pred.UpdateFunc(event.UpdateEvent{ObjectNew: obj})).To(BeFalse())
Expect(pred.GenericFunc(event.GenericEvent{Object: obj})).To(BeFalse())
})

It("processes events in all namespaces when watchNamespaces is empty", func() {
watchNamespaces := []string{}
pred := BuildPredicate(watchNamespaces)

objOld := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret1",
Namespace: "ns1",
},
Type: SecretType,
Data: map[string][]byte{
"a": []byte("b"),
},
}
objNew := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret1",
Namespace: "ns1",
},
Type: SecretType,
Data: map[string][]byte{
"c": []byte("d"),
},
}
Expect(pred.CreateFunc(event.CreateEvent{Object: objNew})).To(BeTrue())
Expect(pred.DeleteFunc(event.DeleteEvent{Object: objNew})).To(BeTrue())
Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeTrue())
Expect(pred.GenericFunc(event.GenericEvent{Object: objNew})).To(BeTrue())
})

It("processes events only in watchNamespaces", func() {
watchNamespaces := []string{"ns1", "ns2"}
pred := BuildPredicate(watchNamespaces)

objOld := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret1",
Namespace: "ns2",
},
Type: SecretType,
Data: map[string][]byte{
"a": []byte("b"),
},
}
objNew := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret1",
Namespace: "ns2",
},
Type: SecretType,
Data: map[string][]byte{
"c": []byte("d"),
},
}
Expect(pred.CreateFunc(event.CreateEvent{Object: objNew})).To(BeTrue())
Expect(pred.DeleteFunc(event.DeleteEvent{Object: objNew})).To(BeTrue())
Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeTrue())
Expect(pred.GenericFunc(event.GenericEvent{Object: objNew})).To(BeTrue())

// change to a non-watched namespace
objOld.ObjectMeta.Namespace = "ns3"
objNew.ObjectMeta.Namespace = "ns3"
Expect(pred.CreateFunc(event.CreateEvent{Object: objNew})).To(BeFalse())
Expect(pred.DeleteFunc(event.DeleteEvent{Object: objNew})).To(BeFalse())
Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeFalse())
Expect(pred.GenericFunc(event.GenericEvent{Object: objNew})).To(BeFalse())
})

It("does not process update events when secret didn't change", func() {
watchNamespaces := []string{}
pred := BuildPredicate(watchNamespaces)

objOld := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret1",
Namespace: "ns1",
},
Type: SecretType,
Data: map[string][]byte{
"a": []byte("b"),
},
}
objNew := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret1",
Namespace: "ns1",
},
Type: SecretType,
Data: map[string][]byte{
"a": []byte("b"),
},
}
Expect(pred.UpdateFunc(event.UpdateEvent{ObjectOld: objOld, ObjectNew: objNew})).To(BeFalse())
})
})
})
27 changes: 16 additions & 11 deletions pkg/multicluster/watch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,34 @@ import (
)

type clusterWatcher struct {
ctx context.Context
handlers *handlerList
managers *managerSet
options manager.Options
ctx context.Context
handlers *handlerList
managers *managerSet
options manager.Options
watchNamespaces []string
}

var _ multicluster.Interface = &clusterWatcher{}

// NewClusterWatcher returns a *clusterWatcher.
// NewClusterWatcher returns a *clusterWatcher, which watches for changes to kubeconfig secrets
// (which contain kubeconfigs for remote clusters).
// When ctx is cancelled, all cluster managers started by the clusterWatcher are stopped.
// Provided manager.Options are applied to all managers started by the clusterWatcher.
func NewClusterWatcher(ctx context.Context, options manager.Options) *clusterWatcher {
// If watchNamespaces is not empty, only secrets in the given namespaces will be watched. If empty, secrets in
// all namespaces will be watched.
func NewClusterWatcher(ctx context.Context, options manager.Options, watchNamespaces []string) *clusterWatcher {
return &clusterWatcher{
ctx: ctx,
handlers: newHandlerList(),
managers: newManagerSet(),
options: options,
ctx: ctx,
handlers: newHandlerList(),
managers: newManagerSet(),
options: options,
watchNamespaces: watchNamespaces,
}
}

func (c *clusterWatcher) Run(master manager.Manager) error {
loop := controller.NewSecretReconcileLoop("cluster watcher", master, reconcile.Options{})
return loop.RunSecretReconciler(c.ctx, c, kubeconfig.Predicate)
return loop.RunSecretReconciler(c.ctx, c, kubeconfig.BuildPredicate(c.watchNamespaces))
}

func (c *clusterWatcher) ReconcileSecret(obj *v1.Secret) (reconcile.Result, error) {
Expand Down

0 comments on commit 40f066a

Please sign in to comment.