diff --git a/pkg/runtime/cache/cache.go b/pkg/runtime/cache/cache.go index cbf0bb4..e77fae3 100644 --- a/pkg/runtime/cache/cache.go +++ b/pkg/runtime/cache/cache.go @@ -58,6 +58,14 @@ func init() { ) } +// Config is used to configure the caches. +type Config struct { + // WatchScope is a list of namespaces to watch for resources + WatchScope []string + // Ignored is a list of namespaces to ignore + Ignored []string +} + // Caches is used to interact with the different caches type Caches struct { // stopCh is a channel use to stop all the @@ -72,10 +80,10 @@ type Caches struct { } // New instantiate a new Caches object. -func New(log logr.Logger) Caches { +func New(log logr.Logger, config Config) Caches { return Caches{ Accounts: NewAccountCache(log), - Namespaces: NewNamespaceCache(log), + Namespaces: NewNamespaceCache(log, config.WatchScope, config.Ignored), } } diff --git a/pkg/runtime/cache/namespace.go b/pkg/runtime/cache/namespace.go index 096328d..d2e2326 100644 --- a/pkg/runtime/cache/namespace.go +++ b/pkg/runtime/cache/namespace.go @@ -80,51 +80,79 @@ type NamespaceCache struct { log logr.Logger // namespaceInfos maps namespaces names to their known namespaceInfo namespaceInfos map[string]*namespaceInfo + // watchScope is the list of namespaces we are watching + watchScope []string + // ignored is the list of namespaces we are ignoring + ignored []string } // NewNamespaceCache instanciate a new NamespaceCache. -func NewNamespaceCache(log logr.Logger) *NamespaceCache { +func NewNamespaceCache(log logr.Logger, watchScope []string, ignored []string) *NamespaceCache { return &NamespaceCache{ log: log.WithName("cache.namespace"), namespaceInfos: make(map[string]*namespaceInfo), + ignored: ignored, + watchScope: watchScope, } } -// isIgnoredNamespace returns true if an object is of type corev1.Namespace and -// its metadata name is the ACK system namespace, 'kube-system' or -// 'kube-public' -func isIgnoredNamespace(raw interface{}) bool { - object, ok := raw.(*corev1.Namespace) - return ok && - (object.ObjectMeta.Name == ackSystemNamespace || - object.ObjectMeta.Name == "kube-system" || - object.ObjectMeta.Name == "kube-public") +// isIgnoredNamespace returns true if the namespace is ignored +func (c *NamespaceCache) isIgnoredNamespace(namespace string) bool { + for _, ns := range c.ignored { + if namespace == ns { + return true + } + } + return false +} + +// inWatchScope returns true if the namespace is in the watch scope +func (c *NamespaceCache) inWatchScope(namespace string) bool { + if len(c.watchScope) == 0 { + return true + } + for _, ns := range c.watchScope { + if namespace == ns { + return true + } + } + return false +} + +// approvedNamespace returns true if the namespace is not ignored and is in the watch scope +func (c *NamespaceCache) approvedNamespace(namespace string) bool { + return !c.isIgnoredNamespace(namespace) && c.inWatchScope(namespace) } // Run instantiate a new shared informer for namespaces and runs it to begin processing items. func (c *NamespaceCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{}) { + c.log.V(1).Info("Starting namespace cache", "watchScope", c.watchScope, "ignored", c.ignored) informer := informersv1.NewNamespaceInformer( clientSet, informerResyncPeriod, - k8scache.Indexers{}, + k8scache.Indexers{ + k8scache.NamespaceIndex: k8scache.MetaNamespaceIndexFunc, + }, ) informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - if !isIgnoredNamespace(obj) { - ns := obj.(*corev1.Namespace) + // It is guaranteed that the object is of type corev1.Namespace + ns := obj.(*corev1.Namespace) + if c.approvedNamespace(ns.ObjectMeta.Name) { c.setNamespaceInfoFromK8sObject(ns) c.log.V(1).Info("created namespace", "name", ns.ObjectMeta.Name) } }, UpdateFunc: func(orig, desired interface{}) { - if !isIgnoredNamespace(desired) { - ns := desired.(*corev1.Namespace) + ns := desired.(*corev1.Namespace) + if c.approvedNamespace(ns.ObjectMeta.Name) { c.setNamespaceInfoFromK8sObject(ns) c.log.V(1).Info("updated namespace", "name", ns.ObjectMeta.Name) } }, DeleteFunc: func(obj interface{}) { - if !isIgnoredNamespace(obj) { + ns := obj.(*corev1.Namespace) + if c.approvedNamespace(ns.ObjectMeta.Name) { ns := obj.(*corev1.Namespace) c.deleteNamespaceInfo(ns.ObjectMeta.Name) c.log.V(1).Info("deleted namespace", "name", ns.ObjectMeta.Name) diff --git a/pkg/runtime/cache/namespace_test.go b/pkg/runtime/cache/namespace_test.go index 8fe5cc1..86fb240 100644 --- a/pkg/runtime/cache/namespace_test.go +++ b/pkg/runtime/cache/namespace_test.go @@ -15,6 +15,7 @@ package cache_test import ( "context" + "io" "testing" "time" @@ -49,7 +50,7 @@ func TestNamespaceCache(t *testing.T) { fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions)) // initlizing account cache - namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger) + namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger, []string{}, []string{}) stopCh := make(chan struct{}) namespaceCache.Run(k8sClient, stopCh) @@ -129,3 +130,109 @@ func TestNamespaceCache(t *testing.T) { _, ok = namespaceCache.GetDefaultRegion(testNamespace1) require.False(t, ok) } + +func TestScopedNamespaceCache(t *testing.T) { + defaultConfig := ackrtcache.Config{ + WatchScope: []string{"watch-scope", "watch-scope-2"}, + Ignored: []string{"ignored", "ignored-2"}, + } + + testCases := []struct { + name string + createNamespace string + expectCacheHit bool + cacheConfig ackrtcache.Config + }{ + { + name: "namespace in scope", + createNamespace: "watch-scope", + expectCacheHit: true, + cacheConfig: defaultConfig, + }, + { + name: "namespace not in scope", + createNamespace: "watch-scope-3", + expectCacheHit: false, + cacheConfig: defaultConfig, + }, + { + name: "namespace in ignored", + createNamespace: "ignored", + expectCacheHit: false, + cacheConfig: defaultConfig, + }, + { + name: "namespace is nor in scope or ignored", + createNamespace: "random-penguin", + expectCacheHit: false, + cacheConfig: defaultConfig, + }, + { + name: "namespace is in scope and ignored", + createNamespace: "watch-scope-2", + expectCacheHit: true, + cacheConfig: defaultConfig, + }, + { + name: "cache watching all namespaces - namespace in scope", + cacheConfig: ackrtcache.Config{}, + createNamespace: "watch-scope", + expectCacheHit: true, + }, + { + name: "cache watching all namespaces - namespace is ignored", + cacheConfig: ackrtcache.Config{Ignored: []string{"kube-system"}}, + createNamespace: "kube-system", + expectCacheHit: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // create a fake k8s client and fake watcher + k8sClient := k8sfake.NewSimpleClientset() + watcher := watch.NewFake() + k8sClient.PrependWatchReactor("random-penguin", k8stesting.DefaultWatchReactor(watcher, nil)) + + // New logger writing to specific buffer + zapOptions := ctrlrtzap.Options{ + Development: true, + Level: zapcore.InfoLevel, + DestWriter: io.Discard, + } + fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions)) + + // initlizing account cache + namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger, tc.cacheConfig.WatchScope, tc.cacheConfig.Ignored) + stopCh := make(chan struct{}) + + namespaceCache.Run(k8sClient, stopCh) + + // Create namespace with name testNamespace1 + _, err := k8sClient.CoreV1().Namespaces().Create( + context.Background(), + newNamespace(tc.createNamespace), + metav1.CreateOptions{}, + ) + require.Nil(t, err) + + // Need a better way to wait for the cache to be updated + // Thinking informer.WaitForCacheSync() ~ but it's not exported + time.Sleep(time.Millisecond * 50) + + _, found := namespaceCache.GetDefaultRegion(tc.createNamespace) + require.Equal(t, tc.expectCacheHit, found) + }) + } +} + +func newNamespace(name string) *corev1.Namespace { + return &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{ + ackv1alpha1.AnnotationDefaultRegion: "us-west-2", + }, + }, + } +} diff --git a/pkg/runtime/service_controller.go b/pkg/runtime/service_controller.go index 7b2630e..d84170e 100644 --- a/pkg/runtime/service_controller.go +++ b/pkg/runtime/service_controller.go @@ -14,6 +14,7 @@ package runtime import ( + "fmt" "strings" "sync" @@ -34,6 +35,18 @@ import ( acktypes "github.com/aws-controllers-k8s/runtime/pkg/types" ) +const ( + // NamespaceKubeNodeLease is the name of the Kubernetes namespace that + // contains the kube-node-lease resources (used for node hearthbeats) + NamespaceKubeNodeLease = "kube-node-lease" + // NamespacePublic is the name of the Kubernetes namespace that contains + // the public info (ConfigMaps) + NamespaceKubePublic = "kube-public" + // NamespaceSystem is the name of the Kubernetes namespace where we place + // system components. + NamespaceKubeSystem = "kube-system" +) + // serviceController wraps a number of `controller-runtime.Reconciler` that are // related to a specific AWS service API. type serviceController struct { @@ -187,13 +200,38 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg c.metaLock.Lock() defer c.metaLock.Unlock() - cache := ackrtcache.New(c.log) - if cfg.WatchNamespace == "" { + namespaces, err := cfg.GetWatchNamespaces() + if err != nil { + return fmt.Errorf("unable to get watch namespaces: %v", err) + } + + cache := ackrtcache.New(c.log, ackrtcache.Config{ + WatchScope: namespaces, + // Default to ignoring the kube-system, kube-public, and + // kube-node-lease namespaces. + // NOTE: Maybe we should make this configurable? It's not clear that + // we'd ever want to watch these namespaces. + Ignored: []string{ + NamespaceKubeSystem, + NamespaceKubePublic, + NamespaceKubeNodeLease, + }}, + ) + // We want to run the caches if the length of the namespaces slice is + // either 0 (watching all namespaces) or greater than 1 (watching multiple + // namespaces). + // + // The caches are only used for cross account resource management. If the + // controller is not configured to watch multiple namespaces, then we don't + // need to run the caches. + if len(namespaces) == 0 || len(namespaces) >= 2 { clusterConfig := mgr.GetConfig() clientSet, err := kubernetes.NewForConfig(clusterConfig) if err != nil { return err } + // Run the caches. This will not block as the caches are run in + // separate goroutines. cache.Run(clientSet) }