@@ -22,15 +22,15 @@ import (
22
22
"go.uber.org/zap"
23
23
"k8s.io/apimachinery/pkg/labels"
24
24
"k8s.io/client-go/tools/cache"
25
- "knative.dev/eventing/pkg/apis/eventing"
26
- eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
25
+ apiseventing "knative.dev/eventing/pkg/apis/eventing"
26
+ eventing "knative.dev/eventing/pkg/apis/eventing/v1"
27
27
eventingclient "knative.dev/eventing/pkg/client/injection/client"
28
28
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
29
29
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
30
30
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
31
31
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
32
32
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
33
- v1 "knative.dev/eventing/pkg/client/listers/eventing/v1"
33
+ eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
34
34
"knative.dev/eventing/pkg/duck"
35
35
"knative.dev/pkg/client/injection/ducks/duck/v1/source"
36
36
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
@@ -63,20 +63,27 @@ func NewController(
63
63
triggerLister : triggerLister ,
64
64
configmapLister : configmapInformer .Lister (),
65
65
}
66
- impl := triggerreconciler .NewImpl (ctx , r )
66
+ impl := triggerreconciler .NewImpl (ctx , r , func (impl * controller.Impl ) controller.Options {
67
+ return controller.Options {
68
+ PromoteFilterFunc : filterTriggers (r .brokerLister ),
69
+ }
70
+ })
67
71
r .impl = impl
68
72
69
73
r .sourceTracker = duck .NewListableTrackerFromTracker (ctx , source .Get , impl .Tracker )
70
74
r .uriResolver = resolver .NewURIResolverFromTracker (ctx , impl .Tracker )
71
75
72
- triggerInformer .Informer ().AddEventHandler (controller .HandleAll (impl .Enqueue ))
76
+ triggerInformer .Informer ().AddEventHandler (cache.FilteringResourceEventHandler {
77
+ FilterFunc : filterTriggers (r .brokerLister ),
78
+ Handler : controller .HandleAll (impl .Enqueue ),
79
+ })
73
80
74
81
// Filter Brokers and enqueue associated Triggers
75
- brokerFilter := pkgreconciler .AnnotationFilterFunc (brokerreconciler .ClassAnnotationKey , eventing .MTChannelBrokerClassValue , false /*allowUnset*/ )
82
+ brokerFilter := pkgreconciler .AnnotationFilterFunc (brokerreconciler .ClassAnnotationKey , apiseventing .MTChannelBrokerClassValue , false /*allowUnset*/ )
76
83
brokerInformer .Informer ().AddEventHandler (cache.FilteringResourceEventHandler {
77
84
FilterFunc : brokerFilter ,
78
85
Handler : controller .HandleAll (func (obj interface {}) {
79
- if broker , ok := obj .(* eventingv1 .Broker ); ok {
86
+ if broker , ok := obj .(* eventing .Broker ); ok {
80
87
for _ , t := range getTriggersForBroker (logger , triggerLister , broker ) {
81
88
impl .Enqueue (t )
82
89
}
@@ -86,20 +93,39 @@ func NewController(
86
93
87
94
// Reconcile Trigger when my Subscription changes
88
95
subscriptionInformer .Informer ().AddEventHandler (cache.FilteringResourceEventHandler {
89
- FilterFunc : controller .FilterController (& eventingv1 .Trigger {}),
96
+ FilterFunc : controller .FilterController (& eventing .Trigger {}),
90
97
Handler : controller .HandleAll (impl .EnqueueControllerOf ),
91
98
})
92
99
93
100
return impl
94
101
}
95
102
103
+ // filterTriggers returns a function that returns true if the resource passed
104
+ // is a trigger pointing to a MTChannelBroker.
105
+ func filterTriggers (lister eventinglisters.BrokerLister ) func (interface {}) bool {
106
+ return func (obj interface {}) bool {
107
+ trigger , ok := obj .(* eventing.Trigger )
108
+ if ! ok {
109
+ return false
110
+ }
111
+
112
+ b , err := lister .Brokers (trigger .Namespace ).Get (trigger .Spec .Broker )
113
+ if err != nil {
114
+ return false
115
+ }
116
+
117
+ value , ok := b .GetAnnotations ()[apiseventing .BrokerClassKey ]
118
+ return ok && value == apiseventing .MTChannelBrokerClassValue
119
+ }
120
+ }
121
+
96
122
// getTriggersForBroker makes sure the object passed in is a Broker, and gets all
97
123
// the Triggers belonging to it. As there is no way to return failures in the
98
124
// Informers EventHandler, errors are logged, and an empty array is returned in case
99
125
// of failures.
100
- func getTriggersForBroker (logger * zap.SugaredLogger , triggerLister v1 .TriggerLister , broker * eventingv1 .Broker ) []* eventingv1 .Trigger {
101
- r := make ([]* eventingv1 .Trigger , 0 )
102
- selector := labels .SelectorFromSet (map [string ]string {eventing .BrokerLabelKey : broker .Name })
126
+ func getTriggersForBroker (logger * zap.SugaredLogger , triggerLister eventinglisters .TriggerLister , broker * eventing .Broker ) []* eventing .Trigger {
127
+ r := make ([]* eventing .Trigger , 0 )
128
+ selector := labels .SelectorFromSet (map [string ]string {apiseventing .BrokerLabelKey : broker .Name })
103
129
triggers , err := triggerLister .Triggers (broker .Namespace ).List (selector )
104
130
if err != nil {
105
131
logger .Warn ("Failed to list triggers" , zap .Any ("broker" , broker ), zap .Error (err ))
0 commit comments