Skip to content

Commit 9e99f31

Browse files
committed
[feat aga] Implement resource monitoring for referenced resources
1 parent 3a1d60a commit 9e99f31

File tree

9 files changed

+1662
-5
lines changed

9 files changed

+1662
-5
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package eventhandlers
2+
3+
import (
4+
"context"
5+
"github.com/go-logr/logr"
6+
corev1 "k8s.io/api/core/v1"
7+
networking "k8s.io/api/networking/v1"
8+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9+
"k8s.io/apimachinery/pkg/types"
10+
"k8s.io/client-go/util/workqueue"
11+
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
"sigs.k8s.io/controller-runtime/pkg/event"
14+
"sigs.k8s.io/controller-runtime/pkg/handler"
15+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
16+
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
17+
)
18+
19+
// NewEnqueueRequestsForResourceEvent creates a new handler for generic resource events
20+
func NewEnqueueRequestsForResourceEvent(
21+
resourceType aga.ResourceType,
22+
referenceTracker *aga.ReferenceTracker,
23+
logger logr.Logger,
24+
) handler.EventHandler {
25+
return &enqueueRequestsForResourceEvent{
26+
resourceType: resourceType,
27+
referenceTracker: referenceTracker,
28+
logger: logger,
29+
}
30+
}
31+
32+
// enqueueRequestsForResourceEvent handles resource events and enqueues reconcile requests for GlobalAccelerators
33+
// that reference the resource
34+
type enqueueRequestsForResourceEvent struct {
35+
resourceType aga.ResourceType
36+
referenceTracker *aga.ReferenceTracker
37+
logger logr.Logger
38+
}
39+
40+
// The following methods implement handler.TypedEventHandler interface
41+
42+
// Create handles Create events with the typed API
43+
func (h *enqueueRequestsForResourceEvent) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
44+
h.handleResource(ctx, evt.Object, "created", queue)
45+
}
46+
47+
// Update handles Update events with the typed API
48+
func (h *enqueueRequestsForResourceEvent) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
49+
h.handleResource(ctx, evt.ObjectNew, "updated", queue)
50+
}
51+
52+
// Delete handles Delete events with the typed API
53+
func (h *enqueueRequestsForResourceEvent) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
54+
h.handleResource(ctx, evt.Object, "deleted", queue)
55+
}
56+
57+
// Generic handles Generic events with the typed API
58+
func (h *enqueueRequestsForResourceEvent) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
59+
h.handleResource(ctx, evt.Object, "generic event", queue)
60+
}
61+
62+
// handleTypedResource handles resource events for the typed interface
63+
func (h *enqueueRequestsForResourceEvent) handleResource(_ context.Context, obj interface{}, eventType string, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
64+
var namespace, name string
65+
66+
// Extract namespace and name based on the object type
67+
switch res := obj.(type) {
68+
case *corev1.Service:
69+
namespace = res.Namespace
70+
name = res.Name
71+
case *networking.Ingress:
72+
namespace = res.Namespace
73+
name = res.Name
74+
case *gwv1.Gateway:
75+
namespace = res.Namespace
76+
name = res.Name
77+
case *unstructured.Unstructured:
78+
namespace = res.GetNamespace()
79+
name = res.GetName()
80+
default:
81+
h.logger.Error(nil, "Unknown resource type", "type", h.resourceType)
82+
return
83+
}
84+
85+
resourceKey := aga.ResourceKey{
86+
Type: h.resourceType,
87+
Name: types.NamespacedName{
88+
Namespace: namespace,
89+
Name: name,
90+
},
91+
}
92+
93+
// If this resource is not referenced by any GA, no need to queue reconciles
94+
if !h.referenceTracker.IsResourceReferenced(resourceKey) {
95+
return
96+
}
97+
98+
// Get all GAs that reference this resource
99+
gaRefs := h.referenceTracker.GetGAsForResource(resourceKey)
100+
101+
// Queue reconcile for affected GAs
102+
for _, gaRef := range gaRefs {
103+
h.logger.V(1).Info("Enqueueing GA for reconcile due to resource event",
104+
"resourceType", h.resourceType,
105+
"resourceName", resourceKey.Name,
106+
"eventType", eventType,
107+
"ga", gaRef)
108+
109+
queue.Add(reconcile.Request{NamespacedName: gaRef})
110+
}
111+
}

controllers/aga/globalaccelerator_controller.go

Lines changed: 146 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@ import (
3535
ctrl "sigs.k8s.io/controller-runtime"
3636
"sigs.k8s.io/controller-runtime/pkg/client"
3737
"sigs.k8s.io/controller-runtime/pkg/controller"
38+
"sigs.k8s.io/controller-runtime/pkg/event"
3839
"sigs.k8s.io/controller-runtime/pkg/reconcile"
40+
"sigs.k8s.io/controller-runtime/pkg/source"
3941

4042
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
43+
"sigs.k8s.io/aws-load-balancer-controller/controllers/aga/eventhandlers"
4144
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
4245
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
4346
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy"
@@ -50,6 +53,7 @@ import (
5053
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
5154
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
5255
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
56+
gwclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
5357
)
5458

5559
const (
@@ -64,6 +68,9 @@ const (
6468
requeueMessage = "Monitoring provisioning state"
6569
statusUpdateRequeueTime = 1 * time.Minute
6670

71+
// Status reason constants
72+
EndpointLoadFailed = "EndpointLoadFailed"
73+
6774
// Metric stage constants
6875
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
6976
MetricStageAddFinalizers = "add_finalizers"
@@ -108,6 +115,18 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
108115
// Create status updater
109116
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)
110117

118+
// Create reference tracker for endpoint tracking
119+
referenceTracker := aga.NewReferenceTracker(logger.WithName("reference-tracker"))
120+
121+
// Create DNS resolver
122+
dnsResolver, err := aga.NewDNSResolver(cloud.ELBV2())
123+
if err != nil {
124+
logger.Error(err, "Failed to create DNS resolver")
125+
}
126+
127+
// Create unified endpoint loader
128+
endpointLoader := aga.NewEndpointLoader(k8sClient, dnsResolver, logger.WithName("endpoint-loader"))
129+
111130
return &globalAcceleratorReconciler{
112131
k8sClient: k8sClient,
113132
eventRecorder: eventRecorder,
@@ -120,6 +139,13 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
120139
metricsCollector: metricsCollector,
121140
reconcileTracker: reconcileCounters.IncrementAGA,
122141

142+
// Components for endpoint reference tracking
143+
referenceTracker: referenceTracker,
144+
dnsResolver: dnsResolver,
145+
146+
// Unified endpoint loader
147+
endpointLoader: endpointLoader,
148+
123149
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
124150
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
125151
}
@@ -138,6 +164,21 @@ type globalAcceleratorReconciler struct {
138164
metricsCollector lbcmetrics.MetricCollector
139165
reconcileTracker func(namespaceName ktypes.NamespacedName)
140166

167+
// Components for endpoint reference tracking
168+
referenceTracker *aga.ReferenceTracker
169+
dnsResolver *aga.DNSResolver
170+
171+
// Unified endpoint loader
172+
endpointLoader aga.EndpointLoader
173+
174+
// Resources manager for dedicated endpoint resource watchers
175+
endpointResourcesManager aga.EndpointResourcesManager
176+
177+
// Event channels for dedicated watchers
178+
serviceEventChan chan event.GenericEvent
179+
ingressEventChan chan event.GenericEvent
180+
gatewayEventChan chan event.GenericEvent
181+
141182
maxConcurrentReconciles int
142183
maxExponentialBackoffDelay time.Duration
143184
}
@@ -194,6 +235,13 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con
194235

195236
func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
196237
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
238+
// Clean up references in the reference tracker
239+
gaKey := k8s.NamespacedName(ga)
240+
r.referenceTracker.RemoveGA(gaKey)
241+
242+
// Clean up resource watches
243+
r.endpointResourcesManager.RemoveGA(gaKey)
244+
197245
// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
198246
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
199247
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
@@ -224,6 +272,29 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi
224272

225273
func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
226274
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
275+
276+
// Get all endpoints from GA
277+
endpoints := aga.GetAllEndpointsFromGA(ga)
278+
279+
// Track referenced endpoints
280+
r.referenceTracker.UpdateReferencesForGA(ga, endpoints)
281+
282+
// Update resource watches with the endpointResourcesManager
283+
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)
284+
285+
// Validate and load endpoint status using the endpoint loader
286+
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
287+
if len(fatalErrors) > 0 {
288+
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
289+
r.logger.Error(err, "Fatal error loading endpoints")
290+
291+
// Handle other endpoint loading errors
292+
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, EndpointLoadFailed, err.Error()); statusErr != nil {
293+
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after endpoint load failure")
294+
}
295+
return err
296+
}
297+
227298
var stack core.Stack
228299
var accelerator *agamodel.Accelerator
229300
var err error
@@ -335,21 +406,91 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
335406
return nil
336407
}
337408

338-
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
409+
// Create event channels for dedicated watchers
410+
r.serviceEventChan = make(chan event.GenericEvent)
411+
r.ingressEventChan = make(chan event.GenericEvent)
412+
r.gatewayEventChan = make(chan event.GenericEvent)
413+
414+
// Initialize Gateway API client using the same config
415+
gwClient, err := gwclientset.NewForConfig(mgr.GetConfig())
416+
if err != nil {
417+
r.logger.Error(err, "Failed to create Gateway API client")
339418
return err
340419
}
341420

342-
// TODO: Add event handlers for Services, Ingresses, and Gateways
343-
// that are referenced by GlobalAccelerator endpoints
421+
// Initialize the endpoint resources manager with clients
422+
r.endpointResourcesManager = aga.NewEndpointResourcesManager(
423+
clientSet,
424+
gwClient,
425+
r.serviceEventChan,
426+
r.ingressEventChan,
427+
r.gatewayEventChan,
428+
r.logger.WithName("endpoint-resources-manager"),
429+
)
344430

345-
return ctrl.NewControllerManagedBy(mgr).
431+
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
432+
return err
433+
}
434+
435+
// Set up the controller builder
436+
ctrl, err := ctrl.NewControllerManagedBy(mgr).
346437
For(&agaapi.GlobalAccelerator{}).
347438
Named(controllerName).
348439
WithOptions(controller.Options{
349440
MaxConcurrentReconciles: r.maxConcurrentReconciles,
350441
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
351442
}).
352-
Complete(r)
443+
Build(r)
444+
445+
if err != nil {
446+
return err
447+
}
448+
449+
// Setup watches for resource events
450+
if err := r.setupGlobalAcceleratorWatches(ctrl); err != nil {
451+
return err
452+
}
453+
454+
return nil
455+
}
456+
457+
// setupGlobalAcceleratorWatches sets up watches for resources that can trigger reconciliation of GlobalAccelerator objects
458+
func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller.Controller) error {
459+
loggerPrefix := r.logger.WithName("eventHandlers")
460+
461+
// Create handlers for our dedicated watchers
462+
serviceHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
463+
aga.ServiceResourceType,
464+
r.referenceTracker,
465+
loggerPrefix.WithName("service-handler"),
466+
)
467+
468+
ingressHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
469+
aga.IngressResourceType,
470+
r.referenceTracker,
471+
loggerPrefix.WithName("ingress-handler"),
472+
)
473+
474+
gatewayHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
475+
aga.GatewayResourceType,
476+
r.referenceTracker,
477+
loggerPrefix.WithName("gateway-handler"),
478+
)
479+
480+
// Add watches using the channel sources with event handlers
481+
if err := c.Watch(source.Channel(r.serviceEventChan, serviceHandler)); err != nil {
482+
return err
483+
}
484+
485+
if err := c.Watch(source.Channel(r.ingressEventChan, ingressHandler)); err != nil {
486+
return err
487+
}
488+
489+
if err := c.Watch(source.Channel(r.gatewayEventChan, gatewayHandler)); err != nil {
490+
return err
491+
}
492+
493+
return nil
353494
}
354495

355496
func (r *globalAcceleratorReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {

0 commit comments

Comments
 (0)