Skip to content

Commit c995b77

Browse files
author
Oluwole Fadeyi
authored
Merge pull request #308 from jetstack/refactoring-dg-IV
Refactor k8d/dynamic datagatherer The k8s/dynamic is now going to use a sharedIndexinformer for k8s native v1 resources, instead of using a dynamicinformer. This reduces the current memory usage by about 25MB.
2 parents 2bffc7c + 8cef113 commit c995b77

File tree

8 files changed

+615
-122
lines changed

8 files changed

+615
-122
lines changed

pkg/datagatherer/k8s/cache.go

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66

77
"github.com/jetstack/preflight/api"
88
"github.com/pmylund/go-cache"
9-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9+
"k8s.io/apimachinery/pkg/types"
1010
)
1111

1212
// time interface, this is used to fetch the current time
@@ -24,60 +24,52 @@ func (*realTime) now() time.Time {
2424
return time.Now()
2525
}
2626

27+
type cacheResource interface {
28+
GetUID() types.UID
29+
GetNamespace() string
30+
}
31+
2732
// onAdd handles the informer creation events, adding the created runtime.Object
2833
// to the data gatherer's cache. The cache key is the uid of the object
2934
func onAdd(obj interface{}, dgCache *cache.Cache) {
30-
item := obj.(*unstructured.Unstructured)
31-
if metadata, ok := item.Object["metadata"]; ok {
32-
data := metadata.(map[string]interface{})
33-
if uid, ok := data["uid"]; ok {
34-
cacheObject := &api.GatheredResource{
35-
Resource: obj,
36-
}
37-
dgCache.Set(uid.(string), cacheObject, cache.DefaultExpiration)
38-
} else {
39-
log.Printf("could not %q resource %q to the cache, missing uid field", "add", data["name"].(string))
35+
item, ok := obj.(cacheResource)
36+
if ok {
37+
cacheObject := &api.GatheredResource{
38+
Resource: obj,
4039
}
41-
} else {
42-
log.Printf("could not %q resource to the cache, missing metadata", "add")
40+
dgCache.Set(string(item.GetUID()), cacheObject, cache.DefaultExpiration)
41+
return
4342
}
43+
log.Printf("could not %q resource to the cache, missing metadata/uid field", "add")
44+
4445
}
4546

4647
// onUpdate handles the informer update events, replacing the old object with the new one
4748
// if it's present in the data gatherer's cache, (if the object isn't present, it gets added).
4849
// The cache key is the uid of the object
4950
func onUpdate(old, new interface{}, dgCache *cache.Cache) {
50-
item := old.(*unstructured.Unstructured)
51-
if metadata, ok := item.Object["metadata"]; ok {
52-
data := metadata.(map[string]interface{})
53-
if uid, ok := data["uid"]; ok {
54-
cacheObject := updateCacheGatheredResource(uid.(string), new, dgCache)
55-
dgCache.Set(uid.(string), cacheObject, cache.DefaultExpiration)
56-
} else {
57-
log.Printf("could not %q resource %q to the cache, missing uid field", "update", data["name"].(string))
58-
}
59-
} else {
60-
log.Printf("could not %q resource to the cache, missing metadata", "update")
51+
item, ok := old.(cacheResource)
52+
if ok {
53+
cacheObject := updateCacheGatheredResource(string(item.GetUID()), new, dgCache)
54+
dgCache.Set(string(item.GetUID()), cacheObject, cache.DefaultExpiration)
55+
return
6156
}
57+
58+
log.Printf("could not %q resource to the cache, missing metadata/uid field", "update")
6259
}
6360

6461
// onDelete handles the informer deletion events, updating the object's properties with the deletion
6562
// time of the object (but not removing the object from the cache).
6663
// The cache key is the uid of the object
6764
func onDelete(obj interface{}, dgCache *cache.Cache) {
68-
item := obj.(*unstructured.Unstructured)
69-
if metadata, ok := item.Object["metadata"]; ok {
70-
data := metadata.(map[string]interface{})
71-
if uid, ok := data["uid"]; ok {
72-
cacheObject := updateCacheGatheredResource(uid.(string), obj, dgCache)
73-
cacheObject.DeletedAt = api.Time{Time: clock.now()}
74-
dgCache.Set(uid.(string), cacheObject, cache.DefaultExpiration)
75-
} else {
76-
log.Printf("could not %q resource %q to the cache, missing uid field", "delete", data["name"].(string))
77-
}
78-
} else {
79-
log.Printf("could not %q resource to the cache, missing metadata", "delete")
65+
item, ok := obj.(cacheResource)
66+
if ok {
67+
cacheObject := updateCacheGatheredResource(string(item.GetUID()), obj, dgCache)
68+
cacheObject.DeletedAt = api.Time{Time: clock.now()}
69+
dgCache.Set(string(item.GetUID()), cacheObject, cache.DefaultExpiration)
70+
return
8071
}
72+
log.Printf("could not %q resource to the cache, missing metadata/uid field", "delete")
8173
}
8274

8375
// creates a new updated instance of a cache object, with the resource

pkg/datagatherer/k8s/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/pkg/errors"
66
"k8s.io/client-go/discovery"
77
"k8s.io/client-go/dynamic"
8+
"k8s.io/client-go/kubernetes"
89
"k8s.io/client-go/rest"
910
"k8s.io/client-go/tools/clientcmd"
1011
)
@@ -43,6 +44,22 @@ func NewDiscoveryClient(kubeconfigPath string) (discovery.DiscoveryClient, error
4344
return *discoveryClient, nil
4445
}
4546

47+
// NewClientSet creates a new kubernetes clientset using the provided kubeconfig.
48+
// If kubeconfigPath is not set/empty, it will attempt to load configuration using
49+
// the default loading rules.
50+
func NewClientSet(kubeconfigPath string) (kubernetes.Interface, error) {
51+
var clientset *kubernetes.Clientset
52+
cfg, err := loadRESTConfig(kubeconfigPath)
53+
if err != nil {
54+
return nil, errors.WithStack(err)
55+
}
56+
clientset, err = kubernetes.NewForConfig(cfg)
57+
if err != nil {
58+
return nil, errors.WithStack(err)
59+
}
60+
return clientset, nil
61+
}
62+
4663
func loadRESTConfig(path string) (*rest.Config, error) {
4764
switch path {
4865
// If the kubeconfig path is not provided, use the default loading rules

0 commit comments

Comments
 (0)