Skip to content

Commit b65911e

Browse files
authored
[feat aga] Add AGA listener support without auto-discovery (#4436)
* [feat aga] Add AGA listener builder without auto-discovery * [feat aga] Add AGA listener deployer with clean up
1 parent 1c2fbc4 commit b65911e

31 files changed

+5738
-96
lines changed

apis/aga/v1beta1/globalaccelerator_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
)
4949

5050
// PortRange defines the port range for Global Accelerator listeners.
51+
// +kubebuilder:validation:XValidation:rule="self.fromPort <= self.toPort",message="FromPort must be less than or equal to ToPort"
5152
type PortRange struct {
5253
// FromPort is the first port in the range of ports, inclusive.
5354
// +kubebuilder:validation:Minimum=1

config/crd/aga/aga-crds.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ spec:
264264
- fromPort
265265
- toPort
266266
type: object
267+
x-kubernetes-validations:
268+
- message: FromPort must be less than or equal to ToPort
269+
rule: self.fromPort <= self.toPort
267270
maxItems: 10
268271
minItems: 1
269272
type: array

config/crd/aga/aga.k8s.aws_globalaccelerators.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ spec:
264264
- fromPort
265265
- toPort
266266
type: object
267+
x-kubernetes-validations:
268+
- message: FromPort must be less than or equal to ToPort
269+
rule: self.fromPort <= self.toPort
267270
maxItems: 10
268271
minItems: 1
269272
type: array
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# This patch adds the GlobalAccelerator validator webhook configuration to the webhook configurations
2+
apiVersion: admissionregistration.k8s.io/v1
3+
kind: ValidatingWebhookConfiguration
4+
metadata:
5+
name: webhook-configuration
6+
webhooks:
7+
- name: vglobalaccelerator.aga.k8s.aws
8+
rules:
9+
- apiGroups:
10+
- "aga.k8s.aws"
11+
apiVersions:
12+
- v1beta1
13+
operations:
14+
- CREATE
15+
- UPDATE
16+
resources:
17+
- globalaccelerators
18+
scope: "Namespaced"

config/webhook/kustomization.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ patchesStrategicMerge:
99
- pod_mutator_patch.yaml
1010
- service_mutator_patch.yaml
1111
- ingressclassparams_validator_patch.yaml
12+
- globalaccelerator_validator_patch.yaml

config/webhook/manifests.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,27 @@ kind: ValidatingWebhookConfiguration
8787
metadata:
8888
name: webhook
8989
webhooks:
90+
- admissionReviewVersions:
91+
- v1beta1
92+
clientConfig:
93+
service:
94+
name: webhook-service
95+
namespace: system
96+
path: /validate-aga-k8s-aws-v1beta1-globalaccelerator
97+
failurePolicy: Fail
98+
matchPolicy: Equivalent
99+
name: vglobalaccelerator.aga.k8s.aws
100+
rules:
101+
- apiGroups:
102+
- aga.k8s.aws
103+
apiVersions:
104+
- v1beta1
105+
operations:
106+
- CREATE
107+
- UPDATE
108+
resources:
109+
- globalaccelerators
110+
sideEffects: None
90111
- admissionReviewVersions:
91112
- v1beta1
92113
clientConfig:

controllers/aga/globalaccelerator_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,9 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
275275
func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
276276
r.logger.Info("Cleaning up GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
277277

278-
// TODO we will handle cleaning up dependent resources when we implement those
278+
// Our enhanced AcceleratorManager now handles deletion of listeners before accelerator.
279+
// TODO: This will be enhanced to delete endpoint groups and endpoints
280+
// before deleting listeners and accelerator (when those features are implemented)
279281
// 1. Find the accelerator ARN from the CRD status
280282
if ga.Status.AcceleratorARN == nil {
281283
r.logger.Info("No accelerator ARN found in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))

helm/aws-load-balancer-controller/crds/aga-crds.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ spec:
264264
- fromPort
265265
- toPort
266266
type: object
267+
x-kubernetes-validations:
268+
- message: FromPort must be less than or equal to ToPort
269+
rule: self.fromPort <= self.toPort
267270
maxItems: 10
268271
minItems: 1
269272
type: array

main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"k8s.io/apimachinery/pkg/util/sets"
2323
"os"
24+
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
2425
"sigs.k8s.io/aws-load-balancer-controller/pkg/shared_utils"
2526

2627
elbv2gw "sigs.k8s.io/aws-load-balancer-controller/apis/gateway/v1beta1"
@@ -66,6 +67,7 @@ import (
6667
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
6768
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
6869
"sigs.k8s.io/aws-load-balancer-controller/pkg/version"
70+
agawebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/aga"
6971
corewebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/core"
7072
elbv2webhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/elbv2"
7173
networkingwebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/networking"
@@ -237,7 +239,7 @@ func main() {
237239
}
238240

239241
// Setup GlobalAccelerator controller only if enabled
240-
if shared_utils.IsAGAControllerEnabled(controllerCFG.FeatureGates, controllerCFG.AWSConfig.Region) {
242+
if aga.IsAGAControllerEnabled(controllerCFG.FeatureGates, controllerCFG.AWSConfig.Region) {
241243
agaReconciler := agacontroller.NewGlobalAcceleratorReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("globalAccelerator"),
242244
finalizerManager, controllerCFG, cloud, ctrl.Log.WithName("controllers").WithName("globalAccelerator"), lbcMetricsCollector, reconcileCounters)
243245
if err := agaReconciler.SetupWithManager(ctx, mgr, clientSet); err != nil {
@@ -420,6 +422,11 @@ func main() {
420422
elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log, lbcMetricsCollector).SetupWithManager(mgr)
421423
elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), cloud.VpcID(), ctrl.Log, lbcMetricsCollector).SetupWithManager(mgr)
422424
networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log, lbcMetricsCollector).SetupWithManager(mgr)
425+
426+
// Setup GlobalAccelerator validator only if enabled
427+
if aga.IsAGAControllerEnabled(controllerCFG.FeatureGates, controllerCFG.AWSConfig.Region) {
428+
agawebhook.NewGlobalAcceleratorValidator(ctrl.Log, lbcMetricsCollector).SetupWithManager(mgr)
429+
}
423430
//+kubebuilder:scaffold:builder
424431

425432
go func() {

pkg/aga/model_build_listener.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package aga
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/pkg/errors"
7+
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
8+
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
9+
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
10+
)
11+
12+
// listenerBuilder builds Listener model resources
13+
type listenerBuilder interface {
14+
Build(ctx context.Context, stack core.Stack, accelerator *agamodel.Accelerator, listeners []agaapi.GlobalAcceleratorListener) ([]*agamodel.Listener, error)
15+
}
16+
17+
// NewListenerBuilder constructs new listenerBuilder
18+
func NewListenerBuilder() listenerBuilder {
19+
return &defaultListenerBuilder{}
20+
}
21+
22+
var _ listenerBuilder = &defaultListenerBuilder{}
23+
24+
type defaultListenerBuilder struct{}
25+
26+
// Build builds Listener model resources
27+
func (b *defaultListenerBuilder) Build(ctx context.Context, stack core.Stack, accelerator *agamodel.Accelerator, listeners []agaapi.GlobalAcceleratorListener) ([]*agamodel.Listener, error) {
28+
if listeners == nil || len(listeners) == 0 {
29+
return nil, nil
30+
}
31+
32+
var result []*agamodel.Listener
33+
for i, listener := range listeners {
34+
listenerModel, err := buildListener(ctx, stack, accelerator, listener, i)
35+
if err != nil {
36+
return nil, err
37+
}
38+
result = append(result, listenerModel)
39+
}
40+
return result, nil
41+
}
42+
43+
// buildListener builds a single Listener model resource
44+
func buildListener(ctx context.Context, stack core.Stack, accelerator *agamodel.Accelerator, listener agaapi.GlobalAcceleratorListener, index int) (*agamodel.Listener, error) {
45+
spec, err := buildListenerSpec(ctx, accelerator, listener)
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
resourceID := fmt.Sprintf("Listener-%d", index)
51+
listenerModel := agamodel.NewListener(stack, resourceID, spec, accelerator)
52+
return listenerModel, nil
53+
}
54+
55+
// buildListenerSpec builds the ListenerSpec for a single Listener model resource
56+
func buildListenerSpec(ctx context.Context, accelerator *agamodel.Accelerator, listener agaapi.GlobalAcceleratorListener) (agamodel.ListenerSpec, error) {
57+
protocol, err := buildListenerProtocol(ctx, listener)
58+
if err != nil {
59+
return agamodel.ListenerSpec{}, err
60+
}
61+
62+
portRanges, err := buildListenerPortRanges(ctx, listener)
63+
if err != nil {
64+
return agamodel.ListenerSpec{}, err
65+
}
66+
67+
clientAffinity := buildListenerClientAffinity(ctx, listener)
68+
69+
return agamodel.ListenerSpec{
70+
AcceleratorARN: accelerator.AcceleratorARN(),
71+
Protocol: protocol,
72+
PortRanges: portRanges,
73+
ClientAffinity: clientAffinity,
74+
}, nil
75+
}
76+
77+
// buildListenerProtocol determines the protocol for the listener
78+
func buildListenerProtocol(_ context.Context, listener agaapi.GlobalAcceleratorListener) (agamodel.Protocol, error) {
79+
if listener.Protocol == nil {
80+
// TODO: Auto-discovery feature - Auto-determine protocol from endpoints if nil
81+
// Return error until auto-discovery feature is implemented
82+
return "", errors.New("listener protocol must be specified (auto-discovery not yet implemented)")
83+
}
84+
85+
switch *listener.Protocol {
86+
case agaapi.GlobalAcceleratorProtocolTCP:
87+
return agamodel.ProtocolTCP, nil
88+
case agaapi.GlobalAcceleratorProtocolUDP:
89+
return agamodel.ProtocolUDP, nil
90+
default:
91+
return "", errors.Errorf("unsupported protocol: %s", *listener.Protocol)
92+
}
93+
}
94+
95+
// buildListenerPortRanges determines the port ranges for the listener
96+
func buildListenerPortRanges(_ context.Context, listener agaapi.GlobalAcceleratorListener) ([]agamodel.PortRange, error) {
97+
if listener.PortRanges == nil {
98+
// TODO: Auto-discovery feature - Auto-determine port ranges from endpoints if nil
99+
// Return error until auto-discovery feature is implemented
100+
return []agamodel.PortRange{}, errors.New("listener port ranges must be specified (auto-discovery not yet implemented)")
101+
}
102+
103+
var portRanges []agamodel.PortRange
104+
for _, pr := range *listener.PortRanges {
105+
// Required validations are already done webhooks and CEL
106+
portRanges = append(portRanges, agamodel.PortRange{
107+
FromPort: pr.FromPort,
108+
ToPort: pr.ToPort,
109+
})
110+
}
111+
return portRanges, nil
112+
}
113+
114+
// buildListenerClientAffinity determines the client affinity for the listener
115+
func buildListenerClientAffinity(_ context.Context, listener agaapi.GlobalAcceleratorListener) agamodel.ClientAffinity {
116+
switch listener.ClientAffinity {
117+
case agaapi.ClientAffinitySourceIP:
118+
return agamodel.ClientAffinitySourceIP
119+
default:
120+
// Default to NONE as per AWS Global Accelerator behavior
121+
return agamodel.ClientAffinityNone
122+
}
123+
}

0 commit comments

Comments
 (0)