@@ -5,18 +5,22 @@ import (
55 "fmt"
66 "math"
77 "net"
8+ "net/netip"
89 "sync"
910
1011 "github.com/vishvananda/netlink"
1112 "golang.org/x/sys/unix"
1213
14+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+ "k8s.io/apimachinery/pkg/labels"
1316 "k8s.io/apimachinery/pkg/util/sets"
1417 corev1informers "k8s.io/client-go/informers/core/v1"
1518 corev1listers "k8s.io/client-go/listers/core/v1"
1619 "k8s.io/client-go/tools/cache"
1720 "k8s.io/client-go/util/retry"
1821 "k8s.io/klog/v2"
1922
23+ "github.com/gaissmai/cidrtree"
2024 egressipv1 "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1"
2125 egressipinformers "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/informers/externalversions/egressip/v1"
2226 egressiplisters "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/listers/egressip/v1"
@@ -173,13 +177,15 @@ func (mic *MarkIPsCache) setSyncdOnce() {
173177 mic .mu .Unlock ()
174178}
175179
176- type BridgeEIPAddrManager struct {
180+ type bridgeEIPAddrManager struct {
177181 nodeName string
178182 bridgeName string
179183 nodeAnnotationMu sync.Mutex
180184 eIPLister egressiplisters.EgressIPLister
181185 eIPInformer cache.SharedIndexInformer
182186 nodeLister corev1listers.NodeLister
187+ podLister corev1listers.PodLister
188+ namespaceLister corev1listers.NamespaceLister
183189 kube kube.Interface
184190 addrManager * linkmanager.Controller
185191 cache * MarkIPsCache
@@ -190,31 +196,173 @@ type BridgeEIPAddrManager struct {
190196// prior to restarting.
191197// It provides the assigned IPs info node IP handler. Node IP handler must not consider assigned EgressIP IPs as possible node IPs.
192198// Openflow manager must generate the SNAT openflow conditional on packet marks and therefore needs access to EIP IPs and associated packet marks.
193- // BridgeEIPAddrManager must be able to force Openflow manager to resync if EgressIP assignment for the node changes.
194- func NewBridgeEIPAddrManager (nodeName , bridgeName string , linkManager * linkmanager.Controller ,
195- kube kube.Interface , eIPInformer egressipinformers.EgressIPInformer , nodeInformer corev1informers.NodeInformer ) * BridgeEIPAddrManager {
196- return & BridgeEIPAddrManager {
199+ // bridgeEIPAddrManager must be able to force Openflow manager to resync if EgressIP assignment for the node changes.
200+ func newBridgeEIPAddrManager (nodeName , bridgeName string , linkManager * linkmanager.Controller ,
201+ kube kube.Interface , eIPInformer egressipinformers.EgressIPInformer , nodeInformer corev1informers.NodeInformer ,
202+ podInformer corev1informers.PodInformer , namespaceInformer corev1informers.NamespaceInformer ) * bridgeEIPAddrManager {
203+ return & bridgeEIPAddrManager {
197204 nodeName : nodeName , // k8 node name
198205 bridgeName : bridgeName , // bridge name for which EIP IPs are managed
199206 nodeAnnotationMu : sync.Mutex {}, // mu for updating Node annotation
200207 eIPLister : eIPInformer .Lister (),
201208 eIPInformer : eIPInformer .Informer (),
202209 nodeLister : nodeInformer .Lister (),
210+ podLister : podInformer .Lister (),
211+ namespaceLister : namespaceInformer .Lister (),
203212 kube : kube ,
204213 addrManager : linkManager ,
205214 cache : NewMarkIPsCache (), // cache to store pkt mark -> EIP IP.
206215 }
207216}
208217
209- func (g * BridgeEIPAddrManager ) GetCache () * MarkIPsCache {
218+ func (g * bridgeEIPAddrManager ) GetCache () * MarkIPsCache {
210219 return g .cache
211220}
212221
213- func (g * BridgeEIPAddrManager ) AddEgressIP (eip * egressipv1.EgressIP ) (bool , error ) {
222+ // NewBridgeEIPAddrManager creates a new bridge EIP address manager
223+ func NewBridgeEIPAddrManager (nodeName , bridgeName string , linkManager * linkmanager.Controller ,
224+ kube kube.Interface , eIPInformer egressipinformers.EgressIPInformer , nodeInformer corev1informers.NodeInformer ) * bridgeEIPAddrManager {
225+ return newBridgeEIPAddrManager (nodeName , bridgeName , linkManager , kube , eIPInformer , nodeInformer , nil , nil )
226+ }
227+
228+ // findLinkOnSameNetworkAsIPUsingLPM finds the correct interface for an EgressIP using longest prefix match
229+ // This is based on the implementation in the secondary EgressIP controller
230+ func (g * bridgeEIPAddrManager ) findLinkOnSameNetworkAsIPUsingLPM (ip net.IP , v4 , v6 bool ) (bool , netlink.Link , error ) {
231+ prefixLinks := map [string ]netlink.Link {} // key is network CIDR
232+ prefixes := make ([]netip.Prefix , 0 )
233+ links , err := util .GetNetLinkOps ().LinkList ()
234+ if err != nil {
235+ return false , nil , fmt .Errorf ("failed to list links: %v" , err )
236+ }
237+ for _ , link := range links {
238+ link := link
239+ linkPrefixes , err := g .getFilteredPrefixes (link , v4 , v6 )
240+ if err != nil {
241+ klog .Errorf ("Failed to get address from link %s: %v" , link .Attrs ().Name , err )
242+ continue
243+ }
244+ prefixes = append (prefixes , linkPrefixes ... )
245+ // create lookup table for later retrieval
246+ for _ , prefixFound := range linkPrefixes {
247+ _ , ipNet , err := net .ParseCIDR (prefixFound .String ())
248+ if err != nil {
249+ klog .Errorf ("Egress IP: skipping prefix %q due to parsing CIDR error: %v" , prefixFound .String (), err )
250+ continue
251+ }
252+ prefixLinks [ipNet .String ()] = link
253+ }
254+ }
255+ lpmTree := cidrtree .New (prefixes ... )
256+ addr , err := netip .ParseAddr (ip .String ())
257+ if err != nil {
258+ return false , nil , fmt .Errorf ("failed to convert IP %s to netip addr: %v" , ip .String (), err )
259+ }
260+ network , found := lpmTree .Lookup (addr )
261+ if ! found {
262+ return false , nil , nil
263+ }
264+ link , ok := prefixLinks [network .String ()]
265+ if ! ok {
266+ return false , nil , nil
267+ }
268+ return true , link , nil
269+ }
270+
271+ // getFilteredPrefixes returns address Prefixes from interfaces with filtering
272+ func (g * bridgeEIPAddrManager ) getFilteredPrefixes (link netlink.Link , v4 , v6 bool ) ([]netip.Prefix , error ) {
273+ validAddresses := make ([]netip.Prefix , 0 )
274+ flags := link .Attrs ().Flags .String ()
275+ if ! g .isLinkUp (flags ) {
276+ return validAddresses , nil
277+ }
278+ linkAddresses , err := util .GetFilteredInterfaceAddrs (link , v4 , v6 )
279+ if err != nil {
280+ return validAddresses , err
281+ }
282+ for _ , addr := range linkAddresses {
283+ // Skip single-host addresses (/32 for IPv4, /128 for IPv6)
284+ ones , bits := addr .Mask .Size ()
285+ if ones == bits {
286+ continue
287+ }
288+ // Convert to netip.Prefix
289+ prefix , err := netip .ParsePrefix (addr .String ())
290+ if err != nil {
291+ klog .Errorf ("Failed to parse address %s as netip.Prefix: %v" , addr .String (), err )
292+ continue
293+ }
294+ validAddresses = append (validAddresses , prefix )
295+ }
296+ return validAddresses , nil
297+ }
298+
299+ // isLinkUp checks if a network link is up
300+ func (g * bridgeEIPAddrManager ) isLinkUp (flags string ) bool {
301+ return (flags != "" && (flags == "up" || flags == "up|broadcast|multicast" ))
302+ }
303+
304+ // hasMatchingPods checks if there are any pods matching the EgressIP's namespace and pod selectors
305+ func (g * bridgeEIPAddrManager ) hasMatchingPods (eip * egressipv1.EgressIP ) (bool , error ) {
306+ namespaceSelector , err := metav1 .LabelSelectorAsSelector (& eip .Spec .NamespaceSelector )
307+ if err != nil {
308+ return false , fmt .Errorf ("failed to convert namespace selector: %v" , err )
309+ }
310+
311+ podSelector , err := metav1 .LabelSelectorAsSelector (& eip .Spec .PodSelector )
312+ if err != nil {
313+ return false , fmt .Errorf ("failed to convert pod selector: %v" , err )
314+ }
315+
316+ // Get all namespaces and filter by label selector
317+ allNamespaces , err := g .namespaceLister .List (labels .Everything ())
318+ if err != nil {
319+ return false , fmt .Errorf ("failed to list all namespaces: %v" , err )
320+ }
321+
322+ for _ , namespace := range allNamespaces {
323+ namespaceLabels := labels .Set (namespace .Labels )
324+ if ! namespaceSelector .Matches (namespaceLabels ) {
325+ continue
326+ }
327+
328+ // Get all pods in this namespace and filter by pod selector
329+ allPods , err := g .podLister .Pods (namespace .Name ).List (labels .Everything ())
330+ if err != nil {
331+ return false , fmt .Errorf ("failed to list pods in namespace %s: %v" , namespace .Name , err )
332+ }
333+
334+ for _ , pod := range allPods {
335+ podLabels := labels .Set (pod .Labels )
336+ if ! podSelector .Matches (podLabels ) {
337+ continue
338+ }
339+
340+ // Check if pod is actually running and has IPs
341+ if ! util .PodCompleted (pod ) && ! util .PodWantsHostNetwork (pod ) && len (pod .Status .PodIPs ) > 0 {
342+ return true , nil
343+ }
344+ }
345+ }
346+
347+ return false , nil
348+ }
349+
350+ func (g * bridgeEIPAddrManager ) addEgressIP (eip * egressipv1.EgressIP ) (bool , error ) {
214351 var isUpdated bool
215352 if ! util .IsEgressIPMarkSet (eip .Annotations ) {
216353 return isUpdated , nil
217354 }
355+
356+ // First check if there are any matching pods for this EgressIP
357+ hasMatchingPods , err := g .hasMatchingPods (eip )
358+ if err != nil {
359+ return isUpdated , fmt .Errorf ("failed to check for matching pods: %v" , err )
360+ }
361+ if ! hasMatchingPods {
362+ klog .V (5 ).Infof ("EgressIP %s has no matching pods yet, skipping bridge IP assignment" , eip .Name )
363+ return isUpdated , nil
364+ }
365+
218366 for _ , status := range eip .Status .Items {
219367 if status .Node != g .nodeName {
220368 continue
@@ -223,6 +371,32 @@ func (g *BridgeEIPAddrManager) AddEgressIP(eip *egressipv1.EgressIP) (bool, erro
223371 if err != nil {
224372 return isUpdated , fmt .Errorf ("failed to add EgressIP gateway config because unable to extract config from EgressIP obj: %v" , err )
225373 }
374+
375+ // Use longest prefix matching to determine the correct interface for this EgressIP
376+ egressIP := net .ParseIP (status .EgressIP )
377+ if egressIP == nil {
378+ return isUpdated , fmt .Errorf ("failed to parse EgressIP %s" , status .EgressIP )
379+ }
380+
381+ isEIPv4 := egressIP .To4 () != nil
382+ found , correctLink , err := g .findLinkOnSameNetworkAsIPUsingLPM (egressIP , isEIPv4 , ! isEIPv4 )
383+ if err != nil {
384+ return isUpdated , fmt .Errorf ("failed to find correct interface using LPM: %v" , err )
385+ }
386+ if ! found {
387+ klog .Warningf ("No suitable interface found for EgressIP %s using LPM" , status .EgressIP )
388+ return isUpdated , nil
389+ }
390+
391+ // Only proceed if the bridge we're managing is the correct interface
392+ if correctLink .Attrs ().Name != g .bridgeName {
393+ klog .V (5 ).Infof ("EgressIP %s should be assigned to interface %s, not bridge %s, skipping" ,
394+ status .EgressIP , correctLink .Attrs ().Name , g .bridgeName )
395+ return isUpdated , nil
396+ }
397+
398+ klog .Infof ("Adding EgressIP %s to bridge %s based on LPM and matching pods" , status .EgressIP , g .bridgeName )
399+
226400 // must always add to cache before adding IP because we want to inform node ip handler that this is not a valid node IP
227401 g .cache .insertMarkIP (pktMark , ip )
228402 if err = g .addIPToAnnotation (ip ); err != nil {
@@ -237,7 +411,7 @@ func (g *BridgeEIPAddrManager) AddEgressIP(eip *egressipv1.EgressIP) (bool, erro
237411 return isUpdated , nil
238412}
239413
240- func (g * BridgeEIPAddrManager ) UpdateEgressIP (oldEIP , newEIP * egressipv1.EgressIP ) (bool , error ) {
414+ func (g * bridgeEIPAddrManager ) UpdateEgressIP (oldEIP , newEIP * egressipv1.EgressIP ) (bool , error ) {
241415 var isUpdated bool
242416 // at most, one status item for this node will be found.
243417 for _ , oldStatus := range oldEIP .Status .Items {
@@ -293,7 +467,7 @@ func (g *BridgeEIPAddrManager) UpdateEgressIP(oldEIP, newEIP *egressipv1.EgressI
293467 return isUpdated , nil
294468}
295469
296- func (g * BridgeEIPAddrManager ) DeleteEgressIP (eip * egressipv1.EgressIP ) (bool , error ) {
470+ func (g * bridgeEIPAddrManager ) DeleteEgressIP (eip * egressipv1.EgressIP ) (bool , error ) {
297471 var isUpdated bool
298472 if ! util .IsEgressIPMarkSet (eip .Annotations ) {
299473 return isUpdated , nil
@@ -322,7 +496,7 @@ func (g *BridgeEIPAddrManager) DeleteEgressIP(eip *egressipv1.EgressIP) (bool, e
322496 return isUpdated , nil
323497}
324498
325- func (g * BridgeEIPAddrManager ) SyncEgressIP (objs []interface {}) error {
499+ func (g * bridgeEIPAddrManager ) SyncEgressIP (objs []interface {}) error {
326500 // caller must synchronise
327501 annotIPs , err := g .getAnnotationIPs ()
328502 if err != nil {
@@ -380,7 +554,7 @@ func (g *BridgeEIPAddrManager) SyncEgressIP(objs []interface{}) error {
380554
381555// addIPToAnnotation adds an address to the collection of existing addresses stored in the nodes annotation. Caller
382556// may repeat addition of addresses without care for duplicate addresses being added.
383- func (g * BridgeEIPAddrManager ) addIPToAnnotation (candidateIP net.IP ) error {
557+ func (g * bridgeEIPAddrManager ) addIPToAnnotation (candidateIP net.IP ) error {
384558 g .nodeAnnotationMu .Lock ()
385559 defer g .nodeAnnotationMu .Unlock ()
386560 return retry .RetryOnConflict (retry .DefaultRetry , func () error {
@@ -412,7 +586,7 @@ func (g *BridgeEIPAddrManager) addIPToAnnotation(candidateIP net.IP) error {
412586
413587// deleteIPsFromAnnotation deletes address from annotation. If multiple users, callers must synchronise.
414588// deletion of address that doesn't exist will not cause an error.
415- func (g * BridgeEIPAddrManager ) deleteIPsFromAnnotation (candidateIPs ... net.IP ) error {
589+ func (g * bridgeEIPAddrManager ) deleteIPsFromAnnotation (candidateIPs ... net.IP ) error {
416590 g .nodeAnnotationMu .Lock ()
417591 defer g .nodeAnnotationMu .Unlock ()
418592 return retry .RetryOnConflict (retry .DefaultRetry , func () error {
@@ -446,15 +620,15 @@ func (g *BridgeEIPAddrManager) deleteIPsFromAnnotation(candidateIPs ...net.IP) e
446620 })
447621}
448622
449- func (g * BridgeEIPAddrManager ) addIPBridge (ip net.IP ) error {
623+ func (g * bridgeEIPAddrManager ) addIPBridge (ip net.IP ) error {
450624 link , err := util .GetNetLinkOps ().LinkByName (g .bridgeName )
451625 if err != nil {
452626 return fmt .Errorf ("failed to get link obj by name %s: %v" , g .bridgeName , err )
453627 }
454628 return g .addrManager .AddAddress (getEIPBridgeNetlinkAddress (ip , link .Attrs ().Index ))
455629}
456630
457- func (g * BridgeEIPAddrManager ) deleteIPBridge (ip net.IP ) error {
631+ func (g * bridgeEIPAddrManager ) deleteIPBridge (ip net.IP ) error {
458632 link , err := util .GetNetLinkOps ().LinkByName (g .bridgeName )
459633 if err != nil {
460634 return fmt .Errorf ("failed to get link obj by name %s: %v" , g .bridgeName , err )
@@ -464,7 +638,7 @@ func (g *BridgeEIPAddrManager) deleteIPBridge(ip net.IP) error {
464638
465639// getAnnotationIPs retrieves the egress IP annotation from the current node Nodes object. If multiple users, callers must synchronise.
466640// if annotation isn't present, empty set is returned
467- func (g * BridgeEIPAddrManager ) getAnnotationIPs () ([]net.IP , error ) {
641+ func (g * bridgeEIPAddrManager ) getAnnotationIPs () ([]net.IP , error ) {
468642 node , err := g .nodeLister .Get (g .nodeName )
469643 if err != nil {
470644 return nil , fmt .Errorf ("failed to get node %s from lister: %v" , g .nodeName , err )
0 commit comments