-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
122 lines (105 loc) · 3.97 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"fmt"
"time"
log "github.com/Sirupsen/logrus"
myresourceclientset "github.com/xiaoheigou/mycrd/pkg/client/clientset/versioned"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
logger *log.Entry
clientset kubernetes.Interface
myresourceClient *myresourceclientset.Clientset
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
handler Handler
}
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
go c.informer.Run(stopCh)
// do the initial synchronization (one time) to populate resources
if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Error syncing cache"))
return
}
c.logger.Info("Controller.Run: cache sync complete")
// run the runWorker method every second with a stop channel
wait.Until(c.runWorker, time.Second, stopCh)
}
// HasSynced allows us to satisfy the Controller interface
// by wiring up the informer's HasSynced method to it
func (c *Controller) HasSynced() bool {
return c.informer.HasSynced()
}
// runWorker executes the loop to process new items added to the queue
func (c *Controller) runWorker() {
log.Info("Controller.runWorker: starting")
// invoke processNextItem to fetch and consume the next change
// to a watched or listed resource
for c.processNextItem() {
log.Info("Controller.runWorker: processing next item")
}
log.Info("Controller.runWorker: completed")
}
// processNextItem retrieves each queued item and takes the
// necessary handler action based off of if the item was
// created or deleted
func (c *Controller) processNextItem() bool {
log.Info("Controller.processNextItem: start")
// fetch the next item (blocking) from the queue to process or
// if a shutdown is requested then return out of this to stop
// processing
key, quit := c.queue.Get()
// stop the worker loop from running as this indicates we
// have sent a shutdown message that the queue has indicated
// from the Get method
if quit {
return false
}
defer c.queue.Done(key)
// assert the string out of the key (format `namespace/name`)
keyRaw := key.(string)
// take the string key and get the object out of the indexer
//
// item will contain the complex object for the resource and
// exists is a bool that'll indicate whether or not the
// resource was created (true) or deleted (false)
//
// if there is an error in getting the key from the index
// then we want to retry this particular queue key a certain
// number of times (5 here) before we forget the queue key
// and throw an error
item, exists, err := c.informer.GetIndexer().GetByKey(keyRaw)
if err != nil {
if c.queue.NumRequeues(key) < 5 {
c.logger.Errorf("Controller.processNextItem: Failed processing item with key %s with error %v, retrying", key, err)
c.queue.AddRateLimited(key)
} else {
c.logger.Errorf("Controller.processNextItem: Failed processing item with key %s with error %v, no more retries", key, err)
c.queue.Forget(key)
utilruntime.HandleError(err)
}
}
// if the item doesn't exist then it was deleted and we need to fire off the handler's
// ObjectDeleted method. but if the object does exist that indicates that the object
// was created (or updated) so run the ObjectCreated method
//
// after both instances, we want to forget the key from the queue, as this indicates
// a code path of successful queue key processing
if !exists {
c.logger.Infof("Controller.processNextItem: object deleted detected: %s", keyRaw)
c.handler.ObjectDeleted(item)
c.queue.Forget(key)
} else {
c.logger.Infof("Controller.processNextItem: object created detected: %s", keyRaw)
c.handler.ObjectCreated(item, c.clientset, c.myresourceClient)
c.queue.Forget(key)
}
// keep the worker loop running by returning true
return true
}