Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions go-controller/pkg/controllermanager/node_controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,18 @@ func (ncm *NodeControllerManager) CleanupStaleNetworks(validNetworks ...util.Net
if !util.IsNetworkSegmentationSupportEnabled() {
return nil
}
validVRFDevices := make(sets.Set[string])
for _, network := range validNetworks {
if !network.IsPrimaryNetwork() {
continue
// in DPU mode, vrfManager would be nil
if ncm.vrfManager != nil {
validVRFDevices := make(sets.Set[string])
for _, network := range validNetworks {
if !network.IsPrimaryNetwork() {
continue
}
validVRFDevices.Insert(util.GetNetworkVRFName(network))
}
validVRFDevices.Insert(util.GetNetworkVRFName(network))
return ncm.vrfManager.Repair(validVRFDevices)
}
return ncm.vrfManager.Repair(validVRFDevices)
return nil
}

// newCommonNetworkControllerInfo creates and returns the base node network controller info
Expand Down Expand Up @@ -126,7 +130,7 @@ func NewNodeControllerManager(ovnClient *util.OVNClientset, wf factory.NodeWatch
return nil, err
}
}
if util.IsNetworkSegmentationSupportEnabled() {
if util.IsNetworkSegmentationSupportEnabled() && config.OvnKubeNode.Mode != ovntypes.NodeModeDPU {
ncm.vrfManager = vrfmanager.NewController(ncm.routeManager)
ncm.ruleManager = iprulemanager.NewController(config.IPv4Mode, config.IPv6Mode)
}
Expand Down
8 changes: 4 additions & 4 deletions go-controller/pkg/node/bridgeconfig/bridgeflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ func (b *BridgeConfiguration) flowsForDefaultBridge(extraIPs []net.IP) ([]string

dftFlows = append(dftFlows,
fmt.Sprintf("cookie=%s, priority=250, table=2, ip, pkt_mark=%s, "+
"actions=set_field:%s->eth_dst,output:%s",
"actions=set_field:%s->eth_dst,%soutput:%s",
nodetypes.DefaultOpenFlowCookie, netConfig.PktMark,
bridgeMacAddress, netConfig.OfPortPatch))
bridgeMacAddress, mod_vlan_id, netConfig.OfPortPatch))
}
}

Expand Down Expand Up @@ -434,9 +434,9 @@ func (b *BridgeConfiguration) flowsForDefaultBridge(extraIPs []net.IP) ([]string
nodetypes.DefaultOpenFlowCookie, netConfig.V6MasqIPs.ManagementPort.IP.String()))
dftFlows = append(dftFlows,
fmt.Sprintf("cookie=%s, priority=250, table=2, ip6, pkt_mark=%s, "+
"actions=set_field:%s->eth_dst,output:%s",
"actions=set_field:%s->eth_dst,%soutput:%s",
nodetypes.DefaultOpenFlowCookie, netConfig.PktMark,
bridgeMacAddress, netConfig.OfPortPatch))
bridgeMacAddress, mod_vlan_id, netConfig.OfPortPatch))
}
}

Expand Down
20 changes: 15 additions & 5 deletions go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newDefaultNodeNetworkController(cnnci *CommonNodeNetworkControllerInfo, sto
routeManager: routeManager,
ovsClient: ovsClient,
}
if util.IsNetworkSegmentationSupportEnabled() {
if util.IsNetworkSegmentationSupportEnabled() && config.OvnKubeNode.Mode != types.NodeModeDPU {
c.udnHostIsolationManager = NewUDNHostIsolationManager(config.IPv4Mode, config.IPv6Mode,
cnnci.watchFactory.PodCoreInformer(), cnnci.name, cnnci.recorder)
}
Expand Down Expand Up @@ -936,6 +936,9 @@ func (nc *DefaultNodeNetworkController) Init(ctx context.Context) error {
if err != nil {
return err
}
}

if config.OvnKubeNode.Mode != types.NodeModeDPU {
if nc.udnHostIsolationManager != nil {
if err = nc.udnHostIsolationManager.Start(ctx); err != nil {
return err
Expand Down Expand Up @@ -980,9 +983,11 @@ func (nc *DefaultNodeNetworkController) Init(ctx context.Context) error {

nodeAnnotator := kube.NewNodeAnnotator(nc.Kube, node.Name)

// Use the device from environment when the DP resource name is specified.
if err := configureMgmtPortNetdevFromResource(); err != nil {
return err
if config.OvnKubeNode.Mode != types.NodeModeDPU {
// Use the device from environment when the DP resource name is specified.
if err := configureMgmtPortNetdevFromResource(); err != nil {
return err
}
}

if config.OvnKubeNode.Mode == types.NodeModeDPUHost {
Expand Down Expand Up @@ -1043,6 +1048,11 @@ func (nc *DefaultNodeNetworkController) Init(ctx context.Context) error {
return err
}
nc.Gateway = gw
} else {
err = nc.initGatewayDPUHostPreStart(nc.nodeAddress, nodeAnnotator)
if err != nil {
return err
}
}

if err := level.Set(strconv.Itoa(config.Logging.Level)); err != nil {
Expand Down Expand Up @@ -1081,7 +1091,7 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {

// Complete gateway initialization
if config.OvnKubeNode.Mode == types.NodeModeDPUHost {
err = nc.initGatewayDPUHost(nc.nodeAddress, nodeAnnotator)
err = nc.initGatewayDPUHost()
if err != nil {
return err
}
Expand Down
62 changes: 46 additions & 16 deletions go-controller/pkg/node/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,17 @@ func gatewayInitInternal(nodeName, gwIntf, egressGatewayIntf string, gwNextHops
}
}

// Set static FDB entry for sharedGW MAC.
// If `GatewayIfaceRep` port is present, use it instead of LOCAL (bridge name).
gwport := gatewayBridge.GetBridgeName() // Default is LOCAL port for the bridge.
if repPort := gatewayBridge.GetGatewayIfaceRep(); repPort != "" { // We have an accelerated switchdev device for GW.
gwport = repPort
}
if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
// Set static FDB entry for sharedGW MAC.
// If `GatewayIfaceRep` port is present, use it instead of LOCAL (bridge name).
gwport := gatewayBridge.GetBridgeName() // Default is LOCAL port for the bridge.
if repPort := gatewayBridge.GetGatewayIfaceRep(); repPort != "" { // We have an accelerated switchdev device for GW.
gwport = repPort
}

if err := util.SetStaticFDBEntry(gatewayBridge.GetBridgeName(), gwport, gatewayBridge.GetMAC()); err != nil {
return nil, nil, err
if err := util.SetStaticFDBEntry(gatewayBridge.GetBridgeName(), gwport, gatewayBridge.GetMAC()); err != nil {
return nil, nil, err
}
}

l3GwConfig := util.L3GatewayConfig{
Expand Down Expand Up @@ -466,26 +468,46 @@ func (g *gateway) GetGatewayBridgeIface() string {
}

func (g *gateway) GetGatewayIface() string {
return g.openflowManager.defaultBridge.GetGatewayIface()
if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
if g.openflowManager == nil {
return ""
}
return g.openflowManager.defaultBridge.GetGatewayIface()
} else {
return config.Gateway.Interface
}
}

// SetDefaultGatewayBridgeMAC updates the mac address for the OFM used to render flows with
func (g *gateway) SetDefaultGatewayBridgeMAC(macAddr net.HardwareAddr) {
if config.OvnKubeNode.Mode == types.NodeModeDPUHost {
return
}
g.openflowManager.setDefaultBridgeMAC(macAddr)
klog.Infof("Default gateway bridge MAC address updated to %s", macAddr)
}

func (g *gateway) SetDefaultPodNetworkAdvertised(isPodNetworkAdvertised bool) {
if config.OvnKubeNode.Mode == types.NodeModeDPUHost {
return
}
g.openflowManager.defaultBridge.GetNetworkConfig(types.DefaultNetworkName).Advertised.Store(isPodNetworkAdvertised)
}

func (g *gateway) GetDefaultPodNetworkAdvertised() bool {
if config.OvnKubeNode.Mode == types.NodeModeDPUHost {
return false
}
return g.openflowManager.defaultBridge.GetNetworkConfig(types.DefaultNetworkName).Advertised.Load()
}

// SetDefaultBridgeGARPDropFlows will enable flows to drop GARPs if the openflow
// manager has been initialized.
func (g *gateway) SetDefaultBridgeGARPDropFlows(isDropped bool) {
if config.OvnKubeNode.Mode == types.NodeModeDPUHost {
return
}

if g.openflowManager == nil {
return
}
Expand All @@ -495,14 +517,22 @@ func (g *gateway) SetDefaultBridgeGARPDropFlows(isDropped bool) {
// Reconcile handles triggering updates to different components of a gateway, like OFM, Services
func (g *gateway) Reconcile() error {
klog.Info("Reconciling gateway with updates")
if err := g.openflowManager.updateBridgeFlowCache(g.nodeIPManager.ListAddresses()); err != nil {
return err
if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
if g.openflowManager != nil {
if err := g.openflowManager.updateBridgeFlowCache(g.nodeIPManager.ListAddresses()); err != nil {
return err
}
// let's sync these flows immediately
g.openflowManager.requestFlowSync()
}
}
// let's sync these flows immediately
g.openflowManager.requestFlowSync()
err := g.updateSNATRules()
if err != nil {
return err
// TBD updateSNATRules() gets node host-cidr by accessing gateway.nodeIPManager, which does not
// exist in dpu-host mode.
if config.OvnKubeNode.Mode == types.NodeModeFull {
err := g.updateSNATRules()
if err != nil {
return err
}
}
// Services create OpenFlow flows as well, need to update them all
if g.servicesRetryFramework != nil {
Expand Down
49 changes: 36 additions & 13 deletions go-controller/pkg/node/gateway_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,9 @@ func interfaceForEXGW(intfName string) string {
return intfName
}

func (nc *DefaultNodeNetworkController) initGatewayDPUHost(kubeNodeIP net.IP, nodeAnnotator kube.Annotator) error {
// A DPU host gateway is complementary to the shared gateway running
// on the DPU embedded CPU. it performs some initializations and
// watch on services for iptable rule updates and run a loadBalancerHealth checker
// Note: all K8s Node related annotations are handled from DPU.
klog.Info("Initializing Shared Gateway Functionality on DPU host")
var err error
// TODO(adrianc): revisit if support for nodeIPManager is needed.
func (nc *DefaultNodeNetworkController) initGatewayDPUHostPreStart(kubeNodeIP net.IP, nodeAnnotator kube.Annotator) error {
klog.Info("Initializing Shared Gateway Functionality for Gateway PreStart on DPU host")

// Find the network interface that has the Kubernetes node IP assigned to it
// This interface will be used for DPU host gateway operations
Expand Down Expand Up @@ -470,18 +466,49 @@ func (nc *DefaultNodeNetworkController) initGatewayDPUHost(kubeNodeIP net.IP, no
return err
}

gw := &gateway{
if err = addHostMACBindings(kubeIntf); err != nil {
return fmt.Errorf("failed to add MAC bindings for service routing: %w", err)
}

// Configure iptables FORWARD rules for cluster and service CIDRs
// These are needed even in DPU-host mode to allow kernel forwarding
if err := configureForwardingRules(); err != nil {
return fmt.Errorf("failed to configure forwarding rules for DPU host: %v", err)
}

gatewayNextHops, _, err := getGatewayNextHops()
if err != nil {
return err
}

nc.Gateway = &gateway{
initFunc: func() error { return nil },
readyFunc: func() (bool, error) { return true, nil },
watchFactory: nc.watchFactory.(*factory.WatchFactory),
nextHops: gatewayNextHops,
}
return nil
}

// TODO(adrianc): revisit if support for nodeIPManager is needed.
func (nc *DefaultNodeNetworkController) initGatewayDPUHost() error {
// A DPU host gateway is complementary to the shared gateway running
// on the DPU embedded CPU. it performs some initializations and
// watch on services for iptable rule updates and run a loadBalancerHealth checker
// Note: all K8s Node related annotations are handled from DPU.
klog.Info("Initializing Shared Gateway Functionality for Gateway Start on DPU host")
var err error

// TODO(adrianc): revisit if support for nodeIPManager is needed.
gw := nc.Gateway.(*gateway)
if config.Gateway.NodeportEnable {
if err := initSharedGatewayIPTables(); err != nil {
return err
}
if util.IsNetworkSegmentationSupportEnabled() {
if err := configureUDNServicesNFTables(); err != nil {
return fmt.Errorf("unable to configure UDN nftables: %w", err)
}
}
gw.nodePortWatcherIptables = newNodePortWatcherIptables(nc.networkManager)
gw.loadBalancerHealthChecker = newLoadBalancerHealthChecker(nc.name, nc.watchFactory)
portClaimWatcher, err := newPortClaimWatcher(nc.recorder)
Expand All @@ -491,10 +518,6 @@ func (nc *DefaultNodeNetworkController) initGatewayDPUHost(kubeNodeIP net.IP, no
gw.portClaimWatcher = portClaimWatcher
}

if err := addHostMACBindings(kubeIntf); err != nil {
return fmt.Errorf("failed to add MAC bindings for service routing")
}

err = gw.Init(nc.stopChan, nc.wg)
nc.Gateway = gw
return err
Expand Down
4 changes: 3 additions & 1 deletion go-controller/pkg/node/gateway_init_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,9 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName,

nodeAnnotator := kube.NewNodeAnnotator(k, existingNode.Name)

err := nc.initGatewayDPUHost(net.ParseIP(hostIP), nodeAnnotator)
err := nc.initGatewayDPUHostPreStart(net.ParseIP(hostIP), nodeAnnotator)
Expect(err).NotTo(HaveOccurred())
err = nc.initGatewayDPUHost()
Expect(err).NotTo(HaveOccurred())

link, err := netlink.LinkByName(uplinkName)
Expand Down
21 changes: 21 additions & 0 deletions go-controller/pkg/node/gateway_iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,27 @@ func delExternalBridgeServiceForwardingRules(cidrs []*net.IPNet) error {
return deleteIptRules(getGatewayForwardRules(cidrs))
}

// configureForwardingRules sets up or removes iptables FORWARD rules for cluster and service CIDRs
// based on config.Gateway.DisableForwarding setting
func configureForwardingRules() error {
var subnets []*net.IPNet
for _, subnet := range config.Default.ClusterSubnets {
subnets = append(subnets, subnet.CIDR)
}
subnets = append(subnets, config.Kubernetes.ServiceCIDRs...)

if config.Gateway.DisableForwarding {
if err := initExternalBridgeServiceForwardingRules(subnets); err != nil {
return fmt.Errorf("failed to add iptables FORWARD rules: %v", err)
}
} else {
if err := delExternalBridgeServiceForwardingRules(subnets); err != nil {
return fmt.Errorf("failed to delete iptables FORWARD rules: %v", err)
}
}
return nil
}

func getLocalGatewayFilterRules(ifname string, cidr *net.IPNet) []nodeipt.Rule {
// Allow packets to/from the gateway interface in case defaults deny
protocol := getIPTablesProtocol(cidr.IP.String())
Expand Down
Loading