diff --git a/go-controller/pkg/controllermanager/node_controller_manager.go b/go-controller/pkg/controllermanager/node_controller_manager.go index 716cb76869..ec9f167f44 100644 --- a/go-controller/pkg/controllermanager/node_controller_manager.go +++ b/go-controller/pkg/controllermanager/node_controller_manager.go @@ -75,14 +75,18 @@ func (ncm *NodeControllerManager) CleanupStaleNetworks(validNetworks ...util.Net if !util.IsNetworkSegmentationSupportEnabled() { return nil } - validVRFDevices := make(sets.Set[string]) - for _, network := range validNetworks { - if !network.IsPrimaryNetwork() { - continue + // in DPU mode, vrfManager would be nil + if ncm.vrfManager != nil { + validVRFDevices := make(sets.Set[string]) + for _, network := range validNetworks { + if !network.IsPrimaryNetwork() { + continue + } + validVRFDevices.Insert(util.GetNetworkVRFName(network)) } - validVRFDevices.Insert(util.GetNetworkVRFName(network)) + return ncm.vrfManager.Repair(validVRFDevices) } - return ncm.vrfManager.Repair(validVRFDevices) + return nil } // newCommonNetworkControllerInfo creates and returns the base node network controller info @@ -126,7 +130,7 @@ func NewNodeControllerManager(ovnClient *util.OVNClientset, wf factory.NodeWatch return nil, err } } - if util.IsNetworkSegmentationSupportEnabled() { + if util.IsNetworkSegmentationSupportEnabled() && config.OvnKubeNode.Mode != ovntypes.NodeModeDPU { ncm.vrfManager = vrfmanager.NewController(ncm.routeManager) ncm.ruleManager = iprulemanager.NewController(config.IPv4Mode, config.IPv6Mode) } diff --git a/go-controller/pkg/node/bridgeconfig/bridgeflows.go b/go-controller/pkg/node/bridgeconfig/bridgeflows.go index 0133459034..4d2ee1240f 100644 --- a/go-controller/pkg/node/bridgeconfig/bridgeflows.go +++ b/go-controller/pkg/node/bridgeconfig/bridgeflows.go @@ -399,9 +399,9 @@ func (b *BridgeConfiguration) flowsForDefaultBridge(extraIPs []net.IP) ([]string dftFlows = append(dftFlows, fmt.Sprintf("cookie=%s, priority=250, table=2, ip, pkt_mark=%s, "+ - "actions=set_field:%s->eth_dst,output:%s", + "actions=set_field:%s->eth_dst,%soutput:%s", nodetypes.DefaultOpenFlowCookie, netConfig.PktMark, - bridgeMacAddress, netConfig.OfPortPatch)) + bridgeMacAddress, mod_vlan_id, netConfig.OfPortPatch)) } } @@ -434,9 +434,9 @@ func (b *BridgeConfiguration) flowsForDefaultBridge(extraIPs []net.IP) ([]string nodetypes.DefaultOpenFlowCookie, netConfig.V6MasqIPs.ManagementPort.IP.String())) dftFlows = append(dftFlows, fmt.Sprintf("cookie=%s, priority=250, table=2, ip6, pkt_mark=%s, "+ - "actions=set_field:%s->eth_dst,output:%s", + "actions=set_field:%s->eth_dst,%soutput:%s", nodetypes.DefaultOpenFlowCookie, netConfig.PktMark, - bridgeMacAddress, netConfig.OfPortPatch)) + bridgeMacAddress, mod_vlan_id, netConfig.OfPortPatch)) } } diff --git a/go-controller/pkg/node/default_node_network_controller.go b/go-controller/pkg/node/default_node_network_controller.go index 5f0878f64e..1daf1c6baf 100644 --- a/go-controller/pkg/node/default_node_network_controller.go +++ b/go-controller/pkg/node/default_node_network_controller.go @@ -153,7 +153,7 @@ func newDefaultNodeNetworkController(cnnci *CommonNodeNetworkControllerInfo, sto routeManager: routeManager, ovsClient: ovsClient, } - if util.IsNetworkSegmentationSupportEnabled() { + if util.IsNetworkSegmentationSupportEnabled() && config.OvnKubeNode.Mode != types.NodeModeDPU { c.udnHostIsolationManager = NewUDNHostIsolationManager(config.IPv4Mode, config.IPv6Mode, cnnci.watchFactory.PodCoreInformer(), cnnci.name, cnnci.recorder) } @@ -936,6 +936,9 @@ func (nc *DefaultNodeNetworkController) Init(ctx context.Context) error { if err != nil { return err } + } + + if config.OvnKubeNode.Mode != types.NodeModeDPU { if nc.udnHostIsolationManager != nil { if err = nc.udnHostIsolationManager.Start(ctx); err != nil { return err @@ -980,9 +983,11 @@ func (nc *DefaultNodeNetworkController) Init(ctx context.Context) error { nodeAnnotator := kube.NewNodeAnnotator(nc.Kube, node.Name) - // Use the device from environment when the DP resource name is specified. - if err := configureMgmtPortNetdevFromResource(); err != nil { - return err + if config.OvnKubeNode.Mode != types.NodeModeDPU { + // Use the device from environment when the DP resource name is specified. + if err := configureMgmtPortNetdevFromResource(); err != nil { + return err + } } if config.OvnKubeNode.Mode == types.NodeModeDPUHost { @@ -1043,6 +1048,11 @@ func (nc *DefaultNodeNetworkController) Init(ctx context.Context) error { return err } nc.Gateway = gw + } else { + err = nc.initGatewayDPUHostPreStart(nc.nodeAddress, nodeAnnotator) + if err != nil { + return err + } } if err := level.Set(strconv.Itoa(config.Logging.Level)); err != nil { @@ -1081,7 +1091,7 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error { // Complete gateway initialization if config.OvnKubeNode.Mode == types.NodeModeDPUHost { - err = nc.initGatewayDPUHost(nc.nodeAddress, nodeAnnotator) + err = nc.initGatewayDPUHost() if err != nil { return err } diff --git a/go-controller/pkg/node/gateway.go b/go-controller/pkg/node/gateway.go index 97e7baeecb..4a7416c64e 100644 --- a/go-controller/pkg/node/gateway.go +++ b/go-controller/pkg/node/gateway.go @@ -429,15 +429,17 @@ func gatewayInitInternal(nodeName, gwIntf, egressGatewayIntf string, gwNextHops } } - // Set static FDB entry for sharedGW MAC. - // If `GatewayIfaceRep` port is present, use it instead of LOCAL (bridge name). - gwport := gatewayBridge.GetBridgeName() // Default is LOCAL port for the bridge. - if repPort := gatewayBridge.GetGatewayIfaceRep(); repPort != "" { // We have an accelerated switchdev device for GW. - gwport = repPort - } + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + // Set static FDB entry for sharedGW MAC. + // If `GatewayIfaceRep` port is present, use it instead of LOCAL (bridge name). + gwport := gatewayBridge.GetBridgeName() // Default is LOCAL port for the bridge. + if repPort := gatewayBridge.GetGatewayIfaceRep(); repPort != "" { // We have an accelerated switchdev device for GW. + gwport = repPort + } - if err := util.SetStaticFDBEntry(gatewayBridge.GetBridgeName(), gwport, gatewayBridge.GetMAC()); err != nil { - return nil, nil, err + if err := util.SetStaticFDBEntry(gatewayBridge.GetBridgeName(), gwport, gatewayBridge.GetMAC()); err != nil { + return nil, nil, err + } } l3GwConfig := util.L3GatewayConfig{ @@ -466,26 +468,46 @@ func (g *gateway) GetGatewayBridgeIface() string { } func (g *gateway) GetGatewayIface() string { - return g.openflowManager.defaultBridge.GetGatewayIface() + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + if g.openflowManager == nil { + return "" + } + return g.openflowManager.defaultBridge.GetGatewayIface() + } else { + return config.Gateway.Interface + } } // SetDefaultGatewayBridgeMAC updates the mac address for the OFM used to render flows with func (g *gateway) SetDefaultGatewayBridgeMAC(macAddr net.HardwareAddr) { + if config.OvnKubeNode.Mode == types.NodeModeDPUHost { + return + } g.openflowManager.setDefaultBridgeMAC(macAddr) klog.Infof("Default gateway bridge MAC address updated to %s", macAddr) } func (g *gateway) SetDefaultPodNetworkAdvertised(isPodNetworkAdvertised bool) { + if config.OvnKubeNode.Mode == types.NodeModeDPUHost { + return + } g.openflowManager.defaultBridge.GetNetworkConfig(types.DefaultNetworkName).Advertised.Store(isPodNetworkAdvertised) } func (g *gateway) GetDefaultPodNetworkAdvertised() bool { + if config.OvnKubeNode.Mode == types.NodeModeDPUHost { + return false + } return g.openflowManager.defaultBridge.GetNetworkConfig(types.DefaultNetworkName).Advertised.Load() } // SetDefaultBridgeGARPDropFlows will enable flows to drop GARPs if the openflow // manager has been initialized. func (g *gateway) SetDefaultBridgeGARPDropFlows(isDropped bool) { + if config.OvnKubeNode.Mode == types.NodeModeDPUHost { + return + } + if g.openflowManager == nil { return } @@ -495,14 +517,22 @@ func (g *gateway) SetDefaultBridgeGARPDropFlows(isDropped bool) { // Reconcile handles triggering updates to different components of a gateway, like OFM, Services func (g *gateway) Reconcile() error { klog.Info("Reconciling gateway with updates") - if err := g.openflowManager.updateBridgeFlowCache(g.nodeIPManager.ListAddresses()); err != nil { - return err + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + if g.openflowManager != nil { + if err := g.openflowManager.updateBridgeFlowCache(g.nodeIPManager.ListAddresses()); err != nil { + return err + } + // let's sync these flows immediately + g.openflowManager.requestFlowSync() + } } - // let's sync these flows immediately - g.openflowManager.requestFlowSync() - err := g.updateSNATRules() - if err != nil { - return err + // TBD updateSNATRules() gets node host-cidr by accessing gateway.nodeIPManager, which does not + // exist in dpu-host mode. + if config.OvnKubeNode.Mode == types.NodeModeFull { + err := g.updateSNATRules() + if err != nil { + return err + } } // Services create OpenFlow flows as well, need to update them all if g.servicesRetryFramework != nil { diff --git a/go-controller/pkg/node/gateway_init.go b/go-controller/pkg/node/gateway_init.go index b4d11d69cf..0b48e215a2 100644 --- a/go-controller/pkg/node/gateway_init.go +++ b/go-controller/pkg/node/gateway_init.go @@ -384,13 +384,9 @@ func interfaceForEXGW(intfName string) string { return intfName } -func (nc *DefaultNodeNetworkController) initGatewayDPUHost(kubeNodeIP net.IP, nodeAnnotator kube.Annotator) error { - // A DPU host gateway is complementary to the shared gateway running - // on the DPU embedded CPU. it performs some initializations and - // watch on services for iptable rule updates and run a loadBalancerHealth checker - // Note: all K8s Node related annotations are handled from DPU. - klog.Info("Initializing Shared Gateway Functionality on DPU host") - var err error +// TODO(adrianc): revisit if support for nodeIPManager is needed. +func (nc *DefaultNodeNetworkController) initGatewayDPUHostPreStart(kubeNodeIP net.IP, nodeAnnotator kube.Annotator) error { + klog.Info("Initializing Shared Gateway Functionality for Gateway PreStart on DPU host") // Find the network interface that has the Kubernetes node IP assigned to it // This interface will be used for DPU host gateway operations @@ -470,18 +466,49 @@ func (nc *DefaultNodeNetworkController) initGatewayDPUHost(kubeNodeIP net.IP, no return err } - gw := &gateway{ + if err = addHostMACBindings(kubeIntf); err != nil { + return fmt.Errorf("failed to add MAC bindings for service routing: %w", err) + } + + // Configure iptables FORWARD rules for cluster and service CIDRs + // These are needed even in DPU-host mode to allow kernel forwarding + if err := configureForwardingRules(); err != nil { + return fmt.Errorf("failed to configure forwarding rules for DPU host: %v", err) + } + + gatewayNextHops, _, err := getGatewayNextHops() + if err != nil { + return err + } + + nc.Gateway = &gateway{ initFunc: func() error { return nil }, readyFunc: func() (bool, error) { return true, nil }, watchFactory: nc.watchFactory.(*factory.WatchFactory), + nextHops: gatewayNextHops, } + return nil +} - // TODO(adrianc): revisit if support for nodeIPManager is needed. +func (nc *DefaultNodeNetworkController) initGatewayDPUHost() error { + // A DPU host gateway is complementary to the shared gateway running + // on the DPU embedded CPU. it performs some initializations and + // watch on services for iptable rule updates and run a loadBalancerHealth checker + // Note: all K8s Node related annotations are handled from DPU. + klog.Info("Initializing Shared Gateway Functionality for Gateway Start on DPU host") + var err error + // TODO(adrianc): revisit if support for nodeIPManager is needed. + gw := nc.Gateway.(*gateway) if config.Gateway.NodeportEnable { if err := initSharedGatewayIPTables(); err != nil { return err } + if util.IsNetworkSegmentationSupportEnabled() { + if err := configureUDNServicesNFTables(); err != nil { + return fmt.Errorf("unable to configure UDN nftables: %w", err) + } + } gw.nodePortWatcherIptables = newNodePortWatcherIptables(nc.networkManager) gw.loadBalancerHealthChecker = newLoadBalancerHealthChecker(nc.name, nc.watchFactory) portClaimWatcher, err := newPortClaimWatcher(nc.recorder) @@ -491,10 +518,6 @@ func (nc *DefaultNodeNetworkController) initGatewayDPUHost(kubeNodeIP net.IP, no gw.portClaimWatcher = portClaimWatcher } - if err := addHostMACBindings(kubeIntf); err != nil { - return fmt.Errorf("failed to add MAC bindings for service routing") - } - err = gw.Init(nc.stopChan, nc.wg) nc.Gateway = gw return err diff --git a/go-controller/pkg/node/gateway_init_linux_test.go b/go-controller/pkg/node/gateway_init_linux_test.go index 7e1f330937..b46657c5c2 100644 --- a/go-controller/pkg/node/gateway_init_linux_test.go +++ b/go-controller/pkg/node/gateway_init_linux_test.go @@ -925,7 +925,9 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName, nodeAnnotator := kube.NewNodeAnnotator(k, existingNode.Name) - err := nc.initGatewayDPUHost(net.ParseIP(hostIP), nodeAnnotator) + err := nc.initGatewayDPUHostPreStart(net.ParseIP(hostIP), nodeAnnotator) + Expect(err).NotTo(HaveOccurred()) + err = nc.initGatewayDPUHost() Expect(err).NotTo(HaveOccurred()) link, err := netlink.LinkByName(uplinkName) diff --git a/go-controller/pkg/node/gateway_iptables.go b/go-controller/pkg/node/gateway_iptables.go index 90bffbe91f..6491af8c6b 100644 --- a/go-controller/pkg/node/gateway_iptables.go +++ b/go-controller/pkg/node/gateway_iptables.go @@ -349,6 +349,27 @@ func delExternalBridgeServiceForwardingRules(cidrs []*net.IPNet) error { return deleteIptRules(getGatewayForwardRules(cidrs)) } +// configureForwardingRules sets up or removes iptables FORWARD rules for cluster and service CIDRs +// based on config.Gateway.DisableForwarding setting +func configureForwardingRules() error { + var subnets []*net.IPNet + for _, subnet := range config.Default.ClusterSubnets { + subnets = append(subnets, subnet.CIDR) + } + subnets = append(subnets, config.Kubernetes.ServiceCIDRs...) + + if config.Gateway.DisableForwarding { + if err := initExternalBridgeServiceForwardingRules(subnets); err != nil { + return fmt.Errorf("failed to add iptables FORWARD rules: %v", err) + } + } else { + if err := delExternalBridgeServiceForwardingRules(subnets); err != nil { + return fmt.Errorf("failed to delete iptables FORWARD rules: %v", err) + } + } + return nil +} + func getLocalGatewayFilterRules(ifname string, cidr *net.IPNet) []nodeipt.Rule { // Allow packets to/from the gateway interface in case defaults deny protocol := getIPTablesProtocol(cidr.IP.String()) diff --git a/go-controller/pkg/node/gateway_iptables_test.go b/go-controller/pkg/node/gateway_iptables_test.go new file mode 100644 index 0000000000..4dc46586b8 --- /dev/null +++ b/go-controller/pkg/node/gateway_iptables_test.go @@ -0,0 +1,291 @@ +//go:build linux +// +build linux + +package node + +import ( + "fmt" + "net" + "runtime" + "sync" + "time" + + "github.com/containernetworking/plugins/pkg/ns" + "github.com/containernetworking/plugins/pkg/testutils" + "github.com/coreos/go-iptables/iptables" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" + nodeipt "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/iptables" + ovntest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing" +) + +var _ = Describe("Gateway IPTables", func() { + var testNS ns.NetNS + var iptCtrl *nodeipt.Controller + + BeforeEach(func() { + if ovntest.NoRoot() { + Skip("Test requires root privileges") + } + + var err error + runtime.LockOSThread() + testNS, err = testutils.NewNS() + Expect(err).NotTo(HaveOccurred()) + + // Start iptables controller + iptCtrl = nodeipt.NewController() + stopCh := make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _ = testNS.Do(func(ns.NetNS) error { + iptCtrl.Run(stopCh, 50*time.Millisecond) + return nil + }) + }() + + DeferCleanup(func() { + close(stopCh) + wg.Wait() + Expect(testNS.Close()).To(Succeed()) + Expect(testutils.UnmountNS(testNS)).To(Succeed()) + runtime.UnlockOSThread() + }) + }) + + Context("configureForwardingRules", func() { + var originalDisableForwarding bool + var originalClusterSubnets []config.CIDRNetworkEntry + var originalServiceCIDRs []*net.IPNet + + BeforeEach(func() { + // Save original config + originalDisableForwarding = config.Gateway.DisableForwarding + originalClusterSubnets = config.Default.ClusterSubnets + originalServiceCIDRs = config.Kubernetes.ServiceCIDRs + + // Setup test config + config.IPv4Mode = true + config.IPv6Mode = false + }) + + AfterEach(func() { + // Restore original config + config.Gateway.DisableForwarding = originalDisableForwarding + config.Default.ClusterSubnets = originalClusterSubnets + config.Kubernetes.ServiceCIDRs = originalServiceCIDRs + }) + + It("should add FORWARD rules when DisableForwarding is true", func() { + // Setup test configuration + config.Gateway.DisableForwarding = true + config.Default.ClusterSubnets = []config.CIDRNetworkEntry{ + {CIDR: ovntest.MustParseIPNet("10.128.0.0/14")}, + } + config.Kubernetes.ServiceCIDRs = []*net.IPNet{ + ovntest.MustParseIPNet("172.30.0.0/16"), + } + config.Gateway.MasqueradeIPs.V4OVNMasqueradeIP = net.ParseIP("169.254.0.1") + + err := testNS.Do(func(ns.NetNS) error { + return configureForwardingRules() + }) + Expect(err).NotTo(HaveOccurred()) + + // Verify rules were added + Eventually(func() error { + return testNS.Do(func(ns.NetNS) error { + ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) + if err != nil { + return err + } + + // Check cluster subnet rules + exists, err := ipt.Exists("filter", "FORWARD", "-s", "10.128.0.0/14", "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("cluster subnet source rule not found") + } + + exists, err = ipt.Exists("filter", "FORWARD", "-d", "10.128.0.0/14", "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("cluster subnet dest rule not found") + } + + // Check service CIDR rules + exists, err = ipt.Exists("filter", "FORWARD", "-s", "172.30.0.0/16", "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("service CIDR source rule not found") + } + + exists, err = ipt.Exists("filter", "FORWARD", "-d", "172.30.0.0/16", "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("service CIDR dest rule not found") + } + + // Check masquerade IP rules + exists, err = ipt.Exists("filter", "FORWARD", "-s", "169.254.0.1", "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("masquerade IP source rule not found") + } + + exists, err = ipt.Exists("filter", "FORWARD", "-d", "169.254.0.1", "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("masquerade IP dest rule not found") + } + + return nil + }) + }, 2*time.Second).Should(Succeed()) + }) + + It("should remove FORWARD rules when DisableForwarding is false", func() { + // Setup test configuration + config.Gateway.DisableForwarding = true + config.Default.ClusterSubnets = []config.CIDRNetworkEntry{ + {CIDR: ovntest.MustParseIPNet("10.128.0.0/14")}, + } + config.Kubernetes.ServiceCIDRs = []*net.IPNet{ + ovntest.MustParseIPNet("172.30.0.0/16"), + } + config.Gateway.MasqueradeIPs.V4OVNMasqueradeIP = net.ParseIP("169.254.0.1") + + // First add the rules + err := testNS.Do(func(ns.NetNS) error { + return configureForwardingRules() + }) + Expect(err).NotTo(HaveOccurred()) + + // Wait for rules to be added + time.Sleep(200 * time.Millisecond) + + // Now change config to remove rules + config.Gateway.DisableForwarding = false + + err = testNS.Do(func(ns.NetNS) error { + return configureForwardingRules() + }) + Expect(err).NotTo(HaveOccurred()) + + // Verify rules were removed + Eventually(func() error { + return testNS.Do(func(ns.NetNS) error { + ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) + if err != nil { + return err + } + + // Check that cluster subnet rules are removed + exists, err := ipt.Exists("filter", "FORWARD", "-s", "10.128.0.0/14", "-j", "ACCEPT") + if err != nil { + return err + } + if exists { + return fmt.Errorf("cluster subnet source rule should be removed") + } + + exists, err = ipt.Exists("filter", "FORWARD", "-d", "10.128.0.0/14", "-j", "ACCEPT") + if err != nil { + return err + } + if exists { + return fmt.Errorf("cluster subnet dest rule should be removed") + } + + return nil + }) + }, 2*time.Second).Should(Succeed()) + }) + + It("should handle multiple cluster subnets and service CIDRs", func() { + // Setup test configuration with multiple CIDRs + config.Gateway.DisableForwarding = true + config.Default.ClusterSubnets = []config.CIDRNetworkEntry{ + {CIDR: ovntest.MustParseIPNet("10.128.0.0/14")}, + {CIDR: ovntest.MustParseIPNet("10.132.0.0/14")}, + } + config.Kubernetes.ServiceCIDRs = []*net.IPNet{ + ovntest.MustParseIPNet("172.30.0.0/16"), + ovntest.MustParseIPNet("172.31.0.0/16"), + } + config.Gateway.MasqueradeIPs.V4OVNMasqueradeIP = net.ParseIP("169.254.0.1") + + err := testNS.Do(func(ns.NetNS) error { + return configureForwardingRules() + }) + Expect(err).NotTo(HaveOccurred()) + + // Verify all rules were added + Eventually(func() error { + return testNS.Do(func(ns.NetNS) error { + ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) + if err != nil { + return err + } + + // Check all cluster subnet rules + for _, subnet := range []string{"10.128.0.0/14", "10.132.0.0/14"} { + exists, err := ipt.Exists("filter", "FORWARD", "-s", subnet, "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("cluster subnet source rule for %s not found", subnet) + } + + exists, err = ipt.Exists("filter", "FORWARD", "-d", subnet, "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("cluster subnet dest rule for %s not found", subnet) + } + } + + // Check all service CIDR rules + for _, cidr := range []string{"172.30.0.0/16", "172.31.0.0/16"} { + exists, err := ipt.Exists("filter", "FORWARD", "-s", cidr, "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("service CIDR source rule for %s not found", cidr) + } + + exists, err = ipt.Exists("filter", "FORWARD", "-d", cidr, "-j", "ACCEPT") + if err != nil { + return err + } + if !exists { + return fmt.Errorf("service CIDR dest rule for %s not found", cidr) + } + } + + return nil + }) + }, 2*time.Second).Should(Succeed()) + }) + }) +}) + diff --git a/go-controller/pkg/node/gateway_shared_intf.go b/go-controller/pkg/node/gateway_shared_intf.go index 35e409618b..8281099f01 100644 --- a/go-controller/pkg/node/gateway_shared_intf.go +++ b/go-controller/pkg/node/gateway_shared_intf.go @@ -1,3 +1,6 @@ +//go:build linux +// +build linux + package node import ( @@ -406,13 +409,17 @@ func (npw *nodePortWatcher) updateServiceFlowCache(service *corev1.Service, netI } // table 2, user-defined network host -> OVN towards default cluster network services defaultNetConfig := npw.ofm.defaultBridge.GetActiveNetworkBridgeConfigCopy(types.DefaultNetworkName) + outputActions := fmt.Sprintf("output:%s", defaultNetConfig.OfPortPatch) + if config.Gateway.VLANID != 0 { + outputActions = fmt.Sprintf("mod_vlan_vid:%d,%s", config.Gateway.VLANID, outputActions) + } // sample flow: cookie=0xdeff105, duration=2319.685s, table=2, n_packets=496, n_bytes=67111, priority=300, // ip,nw_dst=10.96.0.1 actions=mod_dl_dst:02:42:ac:12:00:03,output:"patch-breth0_ov" // This flow is used for UDNs and advertised UDNs to be able to reach kapi and dns services alone on default network flows := []string{fmt.Sprintf("cookie=%s, priority=300, table=2, %s, %s_dst=%s, "+ - "actions=set_field:%s->eth_dst,output:%s", + "actions=set_field:%s->eth_dst,%s", nodetypes.DefaultOpenFlowCookie, ipPrefix, ipPrefix, service.Spec.ClusterIP, - npw.ofm.getDefaultBridgeMAC().String(), defaultNetConfig.OfPortPatch)} + npw.ofm.getDefaultBridgeMAC().String(), outputActions)} if util.IsRouteAdvertisementsEnabled() { // if the network is advertised, then for the reply from kapi and dns services to go back // into the UDN's VRF we need flows that statically send this to the local port @@ -1577,8 +1584,7 @@ func newNodePortWatcher( // on the OVS bridge in the host. These flows act only on the packets coming in from outside // of the node. If someone on the node is trying to access the NodePort service, those packets // will not be processed by the OpenFlow flows, so we need to add iptable rules that DNATs the - // NodePortIP:NodePort to ClusterServiceIP:Port. We don't need to do this while - // running on DPU or on DPU-Host. + // NodePortIP:NodePort to ClusterServiceIP:Port. We don't need to do this on DPU. if config.OvnKubeNode.Mode == types.NodeModeFull { if config.Gateway.Mode == config.GatewayModeLocal { if err := initLocalGatewayIPTables(); err != nil { @@ -1601,19 +1607,8 @@ func newNodePortWatcher( } } - var subnets []*net.IPNet - for _, subnet := range config.Default.ClusterSubnets { - subnets = append(subnets, subnet.CIDR) - } - subnets = append(subnets, config.Kubernetes.ServiceCIDRs...) - if config.Gateway.DisableForwarding { - if err := initExternalBridgeServiceForwardingRules(subnets); err != nil { - return nil, fmt.Errorf("failed to add accept rules in forwarding table for bridge %s: err %v", gwBridge.GetGatewayIface(), err) - } - } else { - if err := delExternalBridgeServiceForwardingRules(subnets); err != nil { - return nil, fmt.Errorf("failed to delete accept rules in forwarding table for bridge %s: err %v", gwBridge.GetGatewayIface(), err) - } + if err := configureForwardingRules(); err != nil { + return nil, fmt.Errorf("failed to configure forwarding rules for bridge %s: %v", gwBridge.GetGatewayIface(), err) } // used to tell addServiceRules which rules to add diff --git a/go-controller/pkg/node/gateway_udn.go b/go-controller/pkg/node/gateway_udn.go index 9d3d0a75ac..a284dca0cd 100644 --- a/go-controller/pkg/node/gateway_udn.go +++ b/go-controller/pkg/node/gateway_udn.go @@ -125,13 +125,17 @@ func NewUserDefinedNetworkGateway(netInfo util.NetInfo, node *corev1.Node, nodeL return nil, fmt.Errorf("unable to dereference default node network controller gateway object") } - if gw.openflowManager == nil { + if config.OvnKubeNode.Mode != types.NodeModeDPUHost && gw.openflowManager == nil { return nil, fmt.Errorf("openflow manager has not been provided for network: %s", netInfo.GetNetworkName()) } - intfName := gw.openflowManager.defaultBridge.GetGatewayIface() + + intfName := gw.GetGatewayIface() + if intfName == "" { + return nil, fmt.Errorf("failed to get gateway interface for network: %s", netInfo.GetNetworkName()) + } link, err := util.GetNetLinkOps().LinkByName(intfName) if err != nil { - return nil, fmt.Errorf("unable to get link for %s, error: %v", intfName, err) + return nil, fmt.Errorf("unable to get link for gateway interface %s, error: %v", intfName, err) } return &UserDefinedNetworkGateway{ @@ -204,7 +208,7 @@ func (udng *UserDefinedNetworkGateway) addMarkChain() error { // AddNetwork will be responsible to create all plumbings // required by this UDN on the gateway side func (udng *UserDefinedNetworkGateway) AddNetwork() error { - if udng.openflowManager == nil { + if config.OvnKubeNode.Mode != types.NodeModeDPUHost && udng.openflowManager == nil { return fmt.Errorf("openflow manager has not been provided for network: %s", udng.NetInfo.GetNetworkName()) } // port is created first and its MAC address configured. The IP(s) on that link are added after enslaving to a VRF device (addUDNManagementPortIPs) @@ -214,81 +218,94 @@ func (udng *UserDefinedNetworkGateway) AddNetwork() error { if err != nil { return fmt.Errorf("could not create management port netdevice for network %s: %w", udng.GetNetworkName(), err) } - vrfDeviceName := util.GetNetworkVRFName(udng.NetInfo) - routes, err := udng.computeRoutesForUDN(mplink) - if err != nil { - return fmt.Errorf("failed to compute routes for network %s, err: %v", udng.GetNetworkName(), err) - } - if err = udng.vrfManager.AddVRF(vrfDeviceName, mplink.Attrs().Name, uint32(udng.vrfTableId), nil); err != nil { - return fmt.Errorf("could not add VRF %d for network %s, err: %v", udng.vrfTableId, udng.GetNetworkName(), err) - } - if err = udng.addUDNManagementPortIPs(mplink); err != nil { - return fmt.Errorf("unable to add management port IP(s) for link %s, for network %s: %w", mplink.Attrs().Name, udng.GetNetworkName(), err) - } - if err = udng.vrfManager.AddVRFRoutes(vrfDeviceName, routes); err != nil { - return fmt.Errorf("could not add VRF %s routes for network %s, err: %v", vrfDeviceName, udng.GetNetworkName(), err) + + if config.OvnKubeNode.Mode != types.NodeModeDPU { + vrfDeviceName := util.GetNetworkVRFName(udng.NetInfo) + routes, err := udng.computeRoutesForUDN(mplink) + if err != nil { + return fmt.Errorf("failed to compute routes for network %s, err: %v", udng.GetNetworkName(), err) + } + if err = udng.vrfManager.AddVRF(vrfDeviceName, mplink.Attrs().Name, uint32(udng.vrfTableId), nil); err != nil { + return fmt.Errorf("could not add VRF %d for network %s, err: %v", udng.vrfTableId, udng.GetNetworkName(), err) + } + if err = udng.addUDNManagementPortIPs(mplink); err != nil { + return fmt.Errorf("unable to add management port IP(s) for link %s, for network %s: %w", mplink.Attrs().Name, udng.GetNetworkName(), err) + } + if err = udng.vrfManager.AddVRFRoutes(vrfDeviceName, routes); err != nil { + return fmt.Errorf("could not add VRF %s routes for network %s, err: %v", vrfDeviceName, udng.GetNetworkName(), err) + } } udng.updateAdvertisementStatus() - // create the iprules for this network - if err = udng.updateUDNVRFIPRules(); err != nil { - return fmt.Errorf("failed to update IP rules for network %s: %w", udng.GetNetworkName(), err) - } - - if err = udng.updateAdvertisedUDNIsolationRules(); err != nil { - return fmt.Errorf("failed to update isolation rules for network %s: %w", udng.GetNetworkName(), err) - } + if config.OvnKubeNode.Mode != types.NodeModeDPU { + // create the iprules for this network + if err = udng.updateUDNVRFIPRules(); err != nil { + return fmt.Errorf("failed to update IP rules for network %s: %w", udng.GetNetworkName(), err) + } - if err := udng.updateUDNVRFIPRoute(); err != nil { - return fmt.Errorf("failed to update ip routes for network %s: %w", udng.GetNetworkName(), err) - } + if err = udng.updateAdvertisedUDNIsolationRules(); err != nil { + return fmt.Errorf("failed to update isolation rules for network %s: %w", udng.GetNetworkName(), err) + } - // add loose mode for rp filter on management port - mgmtPortName := util.GetNetworkScopedK8sMgmtHostIntfName(uint(udng.GetNetworkID())) - if err := addRPFilterLooseModeForManagementPort(mgmtPortName); err != nil { - return fmt.Errorf("could not set loose mode for reverse path filtering on management port %s: %v", mgmtPortName, err) - } + if err := udng.updateUDNVRFIPRoute(); err != nil { + return fmt.Errorf("failed to update ip routes for network %s: %w", udng.GetNetworkName(), err) + } - nodeSubnets, err := udng.getLocalSubnets() - var mgmtIPs []*net.IPNet - for _, subnet := range nodeSubnets { - mgmtIPs = append(mgmtIPs, udng.GetNodeManagementIP(subnet)) - } - if err != nil { - return fmt.Errorf("failed to get node subnets for network %s: %w", udng.GetNetworkName(), err) - } - if err = udng.openflowManager.addNetwork(udng.NetInfo, nodeSubnets, mgmtIPs, udng.masqCTMark, udng.pktMark, udng.v6MasqIPs, udng.v4MasqIPs); err != nil { - return fmt.Errorf("could not add network %s: %v", udng.GetNetworkName(), err) + // add loose mode for rp filter on management port + mgmtPortName := util.GetNetworkScopedK8sMgmtHostIntfName(uint(udng.GetNetworkID())) + if err := addRPFilterLooseModeForManagementPort(mgmtPortName); err != nil { + return fmt.Errorf("could not set loose mode for reverse path filtering on management port %s: %v", mgmtPortName, err) + } } - waiter := newStartupWaiterWithTimeout(waitForPatchPortTimeout) - readyFunc := func() (bool, error) { - if err := udng.openflowManager.defaultBridge.SetNetworkOfPatchPort(udng.GetNetworkName()); err != nil { - klog.V(3).Infof("Failed to set network %s's openflow ports for default bridge; error: %v", udng.GetNetworkName(), err) - return false, nil + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + nodeSubnets, err := udng.getLocalSubnets() + if err != nil { + return fmt.Errorf("failed to get node subnets for network %s: %w", udng.GetNetworkName(), err) } - if udng.openflowManager.externalGatewayBridge != nil { - if err := udng.openflowManager.externalGatewayBridge.SetNetworkOfPatchPort(udng.GetNetworkName()); err != nil { - klog.V(3).Infof("Failed to set network %s's openflow ports for secondary bridge; error: %v", udng.GetNetworkName(), err) + var mgmtIPs []*net.IPNet + for _, subnet := range nodeSubnets { + mgmtIPs = append(mgmtIPs, udng.GetNodeManagementIP(subnet)) + } + if err = udng.openflowManager.addNetwork(udng.NetInfo, nodeSubnets, mgmtIPs, udng.masqCTMark, udng.pktMark, udng.v6MasqIPs, udng.v4MasqIPs); err != nil { + return fmt.Errorf("could not add network %s: %v", udng.GetNetworkName(), err) + } + + waiter := newStartupWaiterWithTimeout(waitForPatchPortTimeout) + readyFunc := func() (bool, error) { + if err := udng.openflowManager.defaultBridge.SetNetworkOfPatchPort(udng.GetNetworkName()); err != nil { + klog.V(3).Infof("Failed to set network %s's openflow ports for default bridge; error: %v", udng.GetNetworkName(), err) return false, nil } + if udng.openflowManager.externalGatewayBridge != nil { + if err := udng.openflowManager.externalGatewayBridge.SetNetworkOfPatchPort(udng.GetNetworkName()); err != nil { + klog.V(3).Infof("Failed to set network %s's openflow ports for secondary bridge; error: %v", udng.GetNetworkName(), err) + return false, nil + } + } + return true, nil } - return true, nil - } - postFunc := func() error { + postFunc := func() error { + if err := udng.gateway.Reconcile(); err != nil { + return fmt.Errorf("failed to reconcile flows on bridge for network %s; error: %v", udng.GetNetworkName(), err) + } + return nil + } + waiter.AddWait(readyFunc, postFunc) + if err := waiter.Wait(); err != nil { + return err + } + } else { if err := udng.gateway.Reconcile(); err != nil { return fmt.Errorf("failed to reconcile flows on bridge for network %s; error: %v", udng.GetNetworkName(), err) } - return nil - } - waiter.AddWait(readyFunc, postFunc) - if err := waiter.Wait(); err != nil { - return err } - if err := udng.addMarkChain(); err != nil { - return fmt.Errorf("failed to add the service masquerade chain: %w", err) + if config.OvnKubeNode.Mode != types.NodeModeDPU { + if err := udng.addMarkChain(); err != nil { + return fmt.Errorf("failed to add the service masquerade chain: %w", err) + } } // run gateway reconciliation loop on network configuration changes @@ -305,33 +322,41 @@ func (udng *UserDefinedNetworkGateway) GetNetworkRuleMetadata() string { // the gateway side. It's considered invalid to call this instance after // DelNetwork has returned succesfully. func (udng *UserDefinedNetworkGateway) DelNetwork() error { - vrfDeviceName := util.GetNetworkVRFName(udng.NetInfo) - // delete the iprules for this network - if err := udng.ruleManager.DeleteWithMetadata(udng.GetNetworkRuleMetadata()); err != nil { - return fmt.Errorf("unable to delete iprules for network %s, err: %v", udng.GetNetworkName(), err) + if config.OvnKubeNode.Mode != types.NodeModeDPU { + vrfDeviceName := util.GetNetworkVRFName(udng.NetInfo) + // delete the iprules for this network + if err := udng.ruleManager.DeleteWithMetadata(udng.GetNetworkRuleMetadata()); err != nil { + return fmt.Errorf("unable to delete iprules for network %s, err: %v", udng.GetNetworkName(), err) + } + // delete the VRF device for this network + if err := udng.vrfManager.DeleteVRF(vrfDeviceName); err != nil { + return err + } } - // delete the VRF device for this network - if err := udng.vrfManager.DeleteVRF(vrfDeviceName); err != nil { - return err + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + // delete the openflows for this network + if udng.openflowManager != nil { + udng.openflowManager.delNetwork(udng.NetInfo) + } } - // delete the openflows for this network - if udng.openflowManager != nil { - udng.openflowManager.delNetwork(udng.NetInfo) + if udng.openflowManager != nil || config.OvnKubeNode.Mode == types.NodeModeDPUHost { if err := udng.gateway.Reconcile(); err != nil { return fmt.Errorf("failed to reconcile default gateway for network %s, err: %v", udng.GetNetworkName(), err) } } - err := udng.deleteAdvertisedUDNIsolationRules() - if err != nil { - return fmt.Errorf("failed to remove advertised UDN isolation rules for network %s: %w", udng.GetNetworkName(), err) - } + if config.OvnKubeNode.Mode != types.NodeModeDPU { + err := udng.deleteAdvertisedUDNIsolationRules() + if err != nil { + return fmt.Errorf("failed to remove advertised UDN isolation rules for network %s: %w", udng.GetNetworkName(), err) + } - if err := udng.delMarkChain(); err != nil { - return err + if err := udng.delMarkChain(); err != nil { + return err + } } // delete the management port interface for this network - err = udng.deleteUDNManagementPort() + err := udng.deleteUDNManagementPort() if err != nil { return err } @@ -795,40 +820,50 @@ func (udng *UserDefinedNetworkGateway) Reconcile() { func (udng *UserDefinedNetworkGateway) doReconcile() error { klog.Infof("Reconciling gateway with updates for UDN %s", udng.GetNetworkName()) - // shouldn't happen - if udng.openflowManager == nil || udng.openflowManager.defaultBridge == nil { - return fmt.Errorf("openflow manager with default bridge configuration has not been provided for network %s", udng.GetNetworkName()) + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + // shouldn't happen + if udng.openflowManager == nil || udng.openflowManager.defaultBridge == nil { + return fmt.Errorf("openflow manager with default bridge configuration has not been provided for network %s", udng.GetNetworkName()) + } } udng.updateAdvertisementStatus() - // update bridge configuration - netConfig := udng.openflowManager.defaultBridge.GetNetworkConfig(udng.GetNetworkName()) - if netConfig == nil { - return fmt.Errorf("missing bridge configuration for network %s", udng.GetNetworkName()) + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + // update bridge configuration + netConfig := udng.openflowManager.defaultBridge.GetNetworkConfig(udng.GetNetworkName()) + if netConfig == nil { + return fmt.Errorf("missing bridge configuration for network %s", udng.GetNetworkName()) + } + netConfig.Advertised.Store(udng.isNetworkAdvertised) } - netConfig.Advertised.Store(udng.isNetworkAdvertised) - if err := udng.updateUDNVRFIPRules(); err != nil { - return fmt.Errorf("error while updating ip rule for UDN %s: %s", udng.GetNetworkName(), err) - } + if config.OvnKubeNode.Mode != types.NodeModeDPU { + if err := udng.updateUDNVRFIPRules(); err != nil { + return fmt.Errorf("error while updating ip rule for UDN %s: %s", udng.GetNetworkName(), err) + } - if err := udng.updateUDNVRFIPRoute(); err != nil { - return fmt.Errorf("error while updating ip route for UDN %s: %s", udng.GetNetworkName(), err) + if err := udng.updateUDNVRFIPRoute(); err != nil { + return fmt.Errorf("error while updating ip route for UDN %s: %s", udng.GetNetworkName(), err) + } } - // add below OpenFlows based on the gateway mode and whether the network is advertised or not: - // table=1, n_packets=0, n_bytes=0, priority=16,ip,nw_dst=128.192.0.2 actions=LOCAL (Both gateway modes) - // table=1, n_packets=0, n_bytes=0, priority=15,ip,nw_dst=128.192.0.0/14 actions=output:3 (shared gateway mode) - // necessary service isolation flows based on whether network is advertised or not - if err := udng.openflowManager.updateBridgeFlowCache(udng.nodeIPManager.ListAddresses()); err != nil { - return fmt.Errorf("error while updating logical flow for UDN %s: %s", udng.GetNetworkName(), err) + if config.OvnKubeNode.Mode != types.NodeModeDPUHost { + // add below OpenFlows based on the gateway mode and whether the network is advertised or not: + // table=1, n_packets=0, n_bytes=0, priority=16,ip,nw_dst=128.192.0.2 actions=LOCAL (Both gateway modes) + // table=1, n_packets=0, n_bytes=0, priority=15,ip,nw_dst=128.192.0.0/14 actions=output:3 (shared gateway mode) + // necessary service isolation flows based on whether network is advertised or not + if err := udng.openflowManager.updateBridgeFlowCache(udng.nodeIPManager.ListAddresses()); err != nil { + return fmt.Errorf("error while updating logical flow for UDN %s: %s", udng.GetNetworkName(), err) + } + // let's sync these flows immediately + udng.openflowManager.requestFlowSync() } - // let's sync these flows immediately - udng.openflowManager.requestFlowSync() - if err := udng.updateAdvertisedUDNIsolationRules(); err != nil { - return fmt.Errorf("error while updating advertised UDN isolation rules for network %s: %w", udng.GetNetworkName(), err) + if config.OvnKubeNode.Mode != types.NodeModeDPU { + if err := udng.updateAdvertisedUDNIsolationRules(); err != nil { + return fmt.Errorf("error while updating advertised UDN isolation rules for network %s: %w", udng.GetNetworkName(), err) + } } return nil }