diff --git a/.gitignore b/.gitignore index 92e7455b41..c2ef215769 100644 --- a/.gitignore +++ b/.gitignore @@ -48,5 +48,5 @@ docs/_build # development files /tmp - +/graphviz /k3s-ansible diff --git a/cmd/ipam/main.go b/cmd/ipam/main.go index 79c926a05d..4daaa73a60 100644 --- a/cmd/ipam/main.go +++ b/cmd/ipam/main.go @@ -72,6 +72,7 @@ func main() { cmd.Flags().IntVar(&options.ServerOpts.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.") cmd.Flags().DurationVar(&options.ServerOpts.SyncFrequency, "interval", consts.SyncFrequency, "The interval at which the IPAM will synchronize the IPAM storage.") + cmd.Flags().BoolVar(&options.ServerOpts.GraphvizEnabled, "enable-graphviz", false, "Enable the graphviz output for the IPAM.") // Leader election flags. cmd.Flags().BoolVar(&options.EnableLeaderElection, "leader-election", false, "Enable leader election for IPAM. "+ @@ -132,7 +133,9 @@ func run(cmd *cobra.Command, _ []string) error { } } - liqoIPAM, err := ipam.New(ctx, cl, &options.ServerOpts) + liqoIPAM, err := ipam.New(ctx, cl, []string{ + "10.0.0.0/8", "192.168.0.0/16", "172.16.0.0/12", + }, &options.ServerOpts) if err != nil { return err } diff --git a/deployments/liqo/README.md b/deployments/liqo/README.md index 27e4d83ba5..e65c9af7bf 100644 --- a/deployments/liqo/README.md +++ b/deployments/liqo/README.md @@ -51,6 +51,7 @@ | ipam.external.enabled | bool | `false` | Use an external IPAM to allocate the IP addresses for the pods. Enabling it will disable the internal IPAM. | | ipam.external.url | string | `""` | The URL of the external IPAM. | | ipam.externalCIDR | string | `"10.70.0.0/16"` | The subnet used for the external CIDR. | +| ipam.graphviz | bool | `false` | Enable/Disable the generation of graphviz files inside the ipam. This feature is useful to visualize the status of the ipam. The graphviz files are stored in the /graphviz directory of the ipam pod (a file for each network pool). You can access them using "kubectl cp". | | ipam.internal.image.name | string | `"ghcr.io/liqotech/ipam"` | Image repository for the IPAM pod. | | ipam.internal.image.version | string | `""` | Custom version for the IPAM image. If not specified, the global tag is used. | | ipam.internal.pod.annotations | object | `{}` | Annotations for the IPAM pod. | @@ -63,6 +64,7 @@ | ipam.podCIDR | string | `""` | The subnet used by the pods in your cluster, in CIDR notation (e.g., 10.0.0.0/16). | | ipam.reservedSubnets | list | `[]` | List of IP subnets that do not have to be used by Liqo. Liqo can perform automatic IP address remapping when a remote cluster is peering with you, e.g., in case IP address spaces (e.g., PodCIDR) overlaps. In order to prevent IP conflicting between locally used private subnets in your infrastructure and private subnets belonging to remote clusters you need tell liqo the subnets used in your cluster. E.g if your cluster nodes belong to the 192.168.2.0/24 subnet, then you should add that subnet to the reservedSubnets. PodCIDR and serviceCIDR used in the local cluster are automatically added to the reserved list. | | ipam.serviceCIDR | string | `""` | The subnet used by the services in you cluster, in CIDR notation (e.g., 172.16.0.0/16). | +| ipam.syncInterval | string | `"2m"` | Set the interval at which the IPAM pod will synchronize it's in-memory status with the local cluster. If you want to disable the synchronization, set the interval to 0. | | metricAgent.config.timeout | object | `{"read":"30s","write":"30s"}` | Set the timeout for the metrics server. | | metricAgent.enable | bool | `true` | Enable/Disable the virtual kubelet metric agent. This component aggregates all the kubelet-related metrics (e.g., CPU, RAM, etc) collected on the nodes that are used by a remote cluster peered with you, then exporting the resulting values as a property of the virtual kubelet running on the remote cluster. | | metricAgent.image.name | string | `"ghcr.io/liqotech/metric-agent"` | Image repository for the metricAgent pod. | diff --git a/deployments/liqo/templates/liqo-ipam-deployment.yaml b/deployments/liqo/templates/liqo-ipam-deployment.yaml index 7ced18abe2..de3a126055 100644 --- a/deployments/liqo/templates/liqo-ipam-deployment.yaml +++ b/deployments/liqo/templates/liqo-ipam-deployment.yaml @@ -10,6 +10,8 @@ metadata: labels: {{- include "liqo.labels" $ipamConfig | nindent 4 }} spec: + strategy: + type: Recreate replicas: {{ .Values.ipam.internal.replicas }} selector: matchLabels: @@ -51,6 +53,7 @@ spec: args: - --pod-name=$(POD_NAME) - --port=6000 + - --interval={{ .Values.ipam.syncInterval }} {{- if $ha }} - --leader-election - --leader-election-namespace=$(POD_NAMESPACE) @@ -61,6 +64,7 @@ spec: {{- if .Values.ipam.internal.pod.extraArgs }} {{- toYaml .Values.ipam.internal.pod.extraArgs | nindent 12 }} {{- end }} + - --enable-graphviz={{ .Values.ipam.graphviz }} env: - name: POD_NAME valueFrom: @@ -71,6 +75,11 @@ spec: fieldRef: fieldPath: metadata.namespace resources: {{- toYaml .Values.ipam.internal.pod.resources | nindent 12 }} + {{- if .Values.ipam.graphviz }} + volumeMounts: + - mountPath: /graphviz + name: graphviz + {{- end }} {{- if ((.Values.common).nodeSelector) }} nodeSelector: {{- toYaml .Values.common.nodeSelector | nindent 8 }} @@ -86,5 +95,9 @@ spec: {{- if .Values.ipam.internal.pod.priorityClassName }} priorityClassName: {{ .Values.ipam.internal.pod.priorityClassName }} {{- end }} - + {{- if .Values.ipam.graphviz }} + volumes: + - name: graphviz + emptyDir: {} + {{- end }} {{- end }} diff --git a/deployments/liqo/values.yaml b/deployments/liqo/values.yaml index 15031fc8ff..9c17eb2a72 100644 --- a/deployments/liqo/values.yaml +++ b/deployments/liqo/values.yaml @@ -460,6 +460,14 @@ ipam: # Network pools are used to map a cluster network into another one in order to prevent conflicts. # Default set of network pools is: [10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12] additionalPools: [] + # -- Enable/Disable the generation of graphviz files inside the ipam. + # This feature is useful to visualize the status of the ipam. + # The graphviz files are stored in the /graphviz directory of the ipam pod (a file for each network pool). + # You can access them using "kubectl cp". + graphviz: false + # -- Set the interval at which the IPAM pod will synchronize it's in-memory status with the local cluster. + # If you want to disable the synchronization, set the interval to 0. + syncInterval: 2m crdReplicator: pod: diff --git a/pkg/ipam/core/doc.go b/pkg/ipam/core/doc.go new file mode 100644 index 0000000000..bc16e2b48a --- /dev/null +++ b/pkg/ipam/core/doc.go @@ -0,0 +1,16 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ipamcore provides the core functionality for the IPAM service. +package ipamcore diff --git a/pkg/ipam/core/ipam.go b/pkg/ipam/core/ipam.go new file mode 100644 index 0000000000..9e3991c1bd --- /dev/null +++ b/pkg/ipam/core/ipam.go @@ -0,0 +1,225 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipamcore + +import ( + "fmt" + "net/netip" + "slices" +) + +// Ipam represents the IPAM core structure. +type Ipam struct { + roots []node +} + +// NewIpam creates a new IPAM instance. +func NewIpam(roots, preallocated []string) (*Ipam, error) { + ipamRootsPrefixes := make([]netip.Prefix, len(roots)) + for i, root := range roots { + ipamRootsPrefixes[i] = netip.MustParsePrefix(root) + } + + ipamPreallocated := make([]netip.Prefix, len(preallocated)) + for i, prefix := range preallocated { + ipamPreallocated[i] = netip.MustParsePrefix(prefix) + } + + if err := checkRoots(ipamRootsPrefixes); err != nil { + return nil, err + } + + if err := checkPreallocated(ipamRootsPrefixes, ipamPreallocated); err != nil { + return nil, err + } + + ipamRoots := make([]node, len(roots)) + for i := range ipamRootsPrefixes { + ipamRoots[i] = newNode(ipamRootsPrefixes[i]) + } + + ipam := &Ipam{ + roots: ipamRoots, + } + + if err := ipam.preallocateNetwork(ipamRootsPrefixes, ipamPreallocated); err != nil { + return nil, err + } + + return ipam, nil +} + +// NetworkAcquire allocates a network of the given size. +// It returns the allocated network or nil if no network is available. +func (ipam *Ipam) NetworkAcquire(size int) *netip.Prefix { + for i := range ipam.roots { + if result := allocateNetwork(size, &ipam.roots[i]); result != nil { + return result + } + } + return nil +} + +// NetworkAcquireWithPrefix allocates a network with the given prefix. +// It returns the allocated network or nil if the network is not available. +func (ipam *Ipam) NetworkAcquireWithPrefix(prefix netip.Prefix) *netip.Prefix { + for i := range ipam.roots { + if result := allocateNetworkWithPrefix(prefix, &ipam.roots[i]); result != nil { + return result + } + } + return nil +} + +// NetworkRelease frees the network with the given prefix. +// It returns the freed network or nil if the network is not found. +func (ipam *Ipam) NetworkRelease(prefix netip.Prefix) *netip.Prefix { + for i := range ipam.roots { + if isPrefixChildOf(ipam.roots[i].prefix, prefix) { + if result := networkRelease(prefix, &ipam.roots[i]); result != nil { + return result + } + } + } + return nil +} + +// ListNetworks returns the list of allocated networks. +func (ipam *Ipam) ListNetworks() []netip.Prefix { + var networks []netip.Prefix + for i := range ipam.roots { + networks = append(networks, listNetworks(&ipam.roots[i])...) + } + return networks +} + +// NetworkIsAvailable checks if the network with the given prefix is allocated. +// It returns true if the network is allocated, false otherwise. +func (ipam *Ipam) NetworkIsAvailable(prefix netip.Prefix) bool { + if node := ipam.search(prefix); node != nil { + return node.acquired + } + return false +} + +// IPAcquire allocates an IP address from the given prefix. +// It returns the allocated IP address or nil if the IP address is not available. +func (ipam *Ipam) IPAcquire(prefix netip.Prefix) *netip.Addr { + if node := ipam.search(prefix); node != nil { + return node.ipAcquire() + } + return nil +} + +// IPAcquireWithAddr allocates the IP address from the given prefix. +// It returns the allocated IP address or nil if the IP address is not available. +func (ipam *Ipam) IPAcquireWithAddr(prefix netip.Prefix, addr netip.Addr) *netip.Addr { + if !prefix.Contains(addr) { + return nil + } + if node := ipam.search(prefix); node != nil { + return node.allocateIPWithAddr(addr) + } + return nil +} + +// IPRelease frees the IP address from the given prefix. +// It returns the freed IP address or nil if the IP address is not found. +func (ipam *Ipam) IPRelease(prefix netip.Prefix, addr netip.Addr) *netip.Addr { + if node := ipam.search(prefix); node != nil { + return node.ipRelease(addr) + } + return nil +} + +// ListIPs returns the list of allocated IP addresses from the given prefix. +func (ipam *Ipam) ListIPs(prefix netip.Prefix) []netip.Addr { + if node := ipam.search(prefix); node != nil { + return slices.Clone(node.listIPs()) + } + return nil +} + +// IsAllocatedIP checks if the IP address is allocated from the given prefix. +// It returns true if the IP address is allocated, false otherwise. +func (ipam *Ipam) IsAllocatedIP(prefix netip.Prefix, addr netip.Addr) bool { + if node := ipam.search(prefix); node != nil { + return node.isAllocatedIP(addr) + } + return false +} + +// ToGraphviz generates the Graphviz representation of the IPAM structure. +func (ipam *Ipam) ToGraphviz() error { + for i := range ipam.roots { + _ = i + if err := ipam.roots[i].toGraphviz(); err != nil { + return fmt.Errorf("failed to generate Graphviz representation: %w", err) + } + } + return nil +} + +func (ipam *Ipam) search(prefix netip.Prefix) *node { + for i := range ipam.roots { + if node := search(prefix, &ipam.roots[i]); node != nil { + return node + } + } + return nil +} + +func checkRoots(roots []netip.Prefix) error { + for i := range roots { + if err := checkHostBitsZero(roots[i]); err != nil { + return err + } + } + return nil +} + +func checkPreallocated(roots, preallocated []netip.Prefix) error { + var err error + for i := range preallocated { + if err = checkHostBitsZero(preallocated[i]); err != nil { + return err + } + isChild := false + for j := range roots { + if isPrefixChildOf(roots[j], preallocated[i]) { + isChild = true + break + } + isChild = false + } + if !isChild { + return fmt.Errorf("prefix %s is not a child of any root cidr", preallocated[i]) + } + } + return nil +} + +func (ipam *Ipam) preallocateNetwork(roots, prefixes []netip.Prefix) error { + for i := range prefixes { + for j := range roots { + if isPrefixChildOf(roots[j], prefixes[i]) { + if prefix := ipam.NetworkAcquireWithPrefix(prefixes[i]); prefix == nil { + return fmt.Errorf("prefix %s is not allocated", prefixes[i]) + } + } + } + } + return nil +} diff --git a/pkg/ipam/core/net.go b/pkg/ipam/core/net.go new file mode 100644 index 0000000000..2dd8fd9fd7 --- /dev/null +++ b/pkg/ipam/core/net.go @@ -0,0 +1,81 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipamcore + +import ( + "fmt" + "net/netip" + "strings" +) + +// convertByteSliceToString converts a slice of bytes to a comma-separated string. +func convertByteSliceToString(byteSlice []byte) string { + strSlice := make([]string, len(byteSlice)) + for i, b := range byteSlice { + strSlice[i] = fmt.Sprintf("%d", b) + } + return strings.Join(strSlice, ".") +} + +// setBit sets the bit at the given position to 1. +func setBit(b, position byte) byte { + if position > 7 { + fmt.Println("Bit position out of range") + return b + } + return b | (1 << (7 - position)) +} + +func checkHostBitsZero(prefix netip.Prefix) error { + if prefix.Masked().Addr().Compare(prefix.Addr()) != 0 { + return fmt.Errorf("%s :host bits must be zero", prefix) + } + return nil +} + +func splitNetworkPrefix(prefix netip.Prefix) (left, right netip.Prefix) { + if err := checkHostBitsZero(prefix); err != nil { + panic("Host bits must be zero") + } + + bin, err := prefix.MarshalBinary() + if err != nil { + panic(err) + } + + maskLen := bin[len(bin)-1] + + left = netip.MustParsePrefix( + fmt.Sprintf("%s/%d", convertByteSliceToString(bin[:4]), maskLen+1), + ) + + byteIndex := maskLen / 8 + bitIndex := maskLen % 8 + + bin[byteIndex] = setBit(bin[byteIndex], bitIndex) + + right = netip.MustParsePrefix( + fmt.Sprintf("%s/%d", convertByteSliceToString(bin[:4]), maskLen+1), + ) + + return left, right +} + +func isPrefixChildOf(parent, child netip.Prefix) bool { + if parent.Bits() <= child.Bits() && parent.Overlaps(child) { + return true + } + return false +} diff --git a/pkg/ipam/core/node.go b/pkg/ipam/core/node.go new file mode 100644 index 0000000000..eca89722dc --- /dev/null +++ b/pkg/ipam/core/node.go @@ -0,0 +1,352 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipamcore + +import ( + "fmt" + "math" + "net/netip" + "os" + "path/filepath" + "strings" +) + +// node represents a node in the binary tree. +type node struct { + prefix netip.Prefix + acquired bool + left *node + right *node + + ips []netip.Addr + lastip netip.Addr +} + +type nodeDirection string + +const ( + leftDirection nodeDirection = "left" + rightDirection nodeDirection = "right" +) + +func newNode(prefix netip.Prefix) node { + return node{prefix: prefix} +} + +func allocateNetwork(size int, node *node) *netip.Prefix { + if node.acquired || node.prefix.Bits() > size { + return nil + } + if node.prefix.Bits() == size { + if !node.isSplit() { + node.acquired = true + return &node.prefix + } + return nil + } + + if !node.isSplit() { + node.split() + } + + first, second := node.bestDirection() + + if prefix := allocateNetwork(size, node.next(first)); prefix != nil { + return prefix + } + return allocateNetwork(size, node.next(second)) +} + +func allocateNetworkWithPrefix(prefix netip.Prefix, node *node) *netip.Prefix { + if node.acquired || !node.prefix.Overlaps(prefix) { + return nil + } + if node.prefix.Addr().Compare(prefix.Addr()) == 0 && node.prefix.Bits() == prefix.Bits() { + if !node.acquired && node.left == nil && node.right == nil { + node.acquired = true + return &node.prefix + } + return nil + } + + if !node.isSplit() { + node.split() + } + + if node.left != nil && node.left.prefix.Overlaps(prefix) { + return allocateNetworkWithPrefix(prefix, node.left) + } + if node.right != nil && node.right.prefix.Overlaps(prefix) { + return allocateNetworkWithPrefix(prefix, node.right) + } + + return nil +} + +func networkRelease(prefix netip.Prefix, node *node) *netip.Prefix { + var result *netip.Prefix + + if node == nil { + return nil + } + + if node.prefix.Addr().Compare(prefix.Addr()) == 0 && node.prefix.Bits() == prefix.Bits() { + if node.acquired { + node.acquired = false + return &node.prefix + } + return nil + } + + if node.left != nil && node.left.prefix.Overlaps(prefix) { + result = networkRelease(prefix, node.left) + } + if node.right != nil && node.right.prefix.Overlaps(prefix) { + result = networkRelease(prefix, node.right) + } + + node.merge() + return result +} + +func listNetworks(node *node) []netip.Prefix { + if node == nil { + return nil + } + + if node.acquired { + return []netip.Prefix{node.prefix} + } + + var networks []netip.Prefix + if node.left != nil { + networks = append(networks, listNetworks(node.left)...) + } + if node.right != nil { + networks = append(networks, listNetworks(node.right)...) + } + + return networks +} + +func (n *node) isAllocatedIP(ip netip.Addr) bool { + for i := range n.ips { + if n.ips[i].Compare(ip) == 0 { + return true + } + } + return false +} + +func (n *node) ipAcquire() *netip.Addr { + if !n.acquired { + return nil + } + + size := int(math.Pow(2, float64(n.prefix.Addr().BitLen()-n.prefix.Bits()))) + + if !n.lastip.IsValid() { + n.lastip = n.prefix.Addr() + } + + addr := n.lastip + + if n.lastip.Compare(n.prefix.Addr()) != 0 { + addr = addr.Next() + } + + for i := 0; i < size; i++ { + if !n.prefix.Contains(addr) { + addr = n.prefix.Addr() + } + if !n.isAllocatedIP(addr) { + n.ips = append(n.ips, addr) + n.lastip = addr + return &addr + } + addr = addr.Next() + } + return nil +} + +func (n *node) allocateIPWithAddr(addr netip.Addr) *netip.Addr { + if !n.acquired { + return nil + } + + if !n.prefix.Contains(addr) { + return nil + } + + for i := range n.ips { + if n.ips[i].Compare(addr) == 0 { + return nil + } + } + + n.ips = append(n.ips, addr) + + return &n.ips[len(n.ips)-1] +} + +func (n *node) ipRelease(ip netip.Addr) *netip.Addr { + if !n.acquired { + return nil + } + + for i, addr := range n.ips { + if addr.Compare(ip) == 0 { + n.ips = append(n.ips[:i], n.ips[i+1:]...) + return &addr + } + } + return nil +} + +func (n *node) listIPs() []netip.Addr { + return n.ips +} + +func search(prefix netip.Prefix, node *node) *node { + if node == nil { + return nil + } + + if node.prefix.Addr().Compare(prefix.Addr()) == 0 && node.prefix.Bits() == prefix.Bits() { + return node + } + + if node.left != nil && node.left.prefix.Overlaps(prefix) { + return search(prefix, node.left) + } + if node.right != nil && node.right.prefix.Overlaps(prefix) { + return search(prefix, node.right) + } + + return nil +} + +func (n *node) split() { + if n.isSplit() { + return + } + left, right := splitNetworkPrefix(n.prefix) + n.insert(leftDirection, left) + n.insert(rightDirection, right) +} + +func (n *node) merge() { + if n.left.isLeaf() && n.right.isLeaf() && !n.left.acquired && !n.right.acquired { + n.left = nil + n.right = nil + } +} + +func (n *node) insert(nd nodeDirection, prefix netip.Prefix) { + newNode := &node{prefix: prefix} + switch nd { + case leftDirection: + n.left = newNode + return + case rightDirection: + n.right = newNode + return + default: + return + } +} + +func (n *node) bestDirection() (first, second nodeDirection) { + if n.left.isSplit() { + return leftDirection, rightDirection + } + if n.right.isSplit() { + return rightDirection, leftDirection + } + return leftDirection, rightDirection +} + +func (n *node) isSplit() bool { + if n.left != nil && n.right != nil { + return true + } + return false +} + +func (n *node) isLeaf() bool { + if n.left == nil && n.right == nil { + return true + } + return false +} + +func (n *node) next(direction nodeDirection) *node { + switch direction { + case leftDirection: + return n.left + case rightDirection: + return n.right + default: + return nil + } +} + +func (n *node) toGraphviz() error { + var sb strings.Builder + sb.WriteString("digraph G {\n") + n.toGraphvizRecursive(&sb) + sb.WriteString("}\n") + + if _, err := os.Stat("/graphviz"); os.IsNotExist(err) { + if err := os.Mkdir("/graphviz", 0o700); err != nil { + return err + } + } + + filePath := filepath.Clean("/graphviz/" + strings.NewReplacer("/", "_", ".", "_").Replace(n.prefix.String()) + ".dot") + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + _, err = file.WriteString(sb.String()) + return err +} + +func (n *node) toGraphvizRecursive(sb *strings.Builder) { + if n == nil { + return + } + label := n.prefix.String() + if len(n.ips) > 0 { + ipsString := []string{} + for i := range n.ips { + ipsString = append(ipsString, n.ips[i].String()) + } + label += "\\n" + strings.Join(ipsString, "\\n") + } + if n.acquired { + fmt.Fprintf(sb, " %q [label=\"%s\", style=filled, color=\"#57cc99\"];\n", n.prefix, label) + } + if n.left != nil { + fmt.Fprintf(sb, " %q -> %q;\n", n.prefix, n.left.prefix) + n.left.toGraphvizRecursive(sb) + } + if n.right != nil { + fmt.Fprintf(sb, " %q -> %q;\n", n.prefix, n.right.prefix) + n.right.toGraphvizRecursive(sb) + } +} diff --git a/pkg/ipam/initialize.go b/pkg/ipam/initialize.go index 647e04e8cf..8922fa98d6 100644 --- a/pkg/ipam/initialize.go +++ b/pkg/ipam/initialize.go @@ -24,6 +24,8 @@ import ( // +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch func (lipam *LiqoIPAM) initialize(ctx context.Context) error { + klog.Info("Initializing IPAM") + if err := lipam.initializeNetworks(ctx); err != nil { return err } @@ -38,15 +40,14 @@ func (lipam *LiqoIPAM) initialize(ctx context.Context) error { func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error { // List all networks present in the cluster. - nets, err := listNetworksOnCluster(ctx, lipam.Client) + nets, err := lipam.listNetworksOnCluster(ctx, lipam.Client) if err != nil { return err } // Initialize the networks. for _, net := range nets { - if err := lipam.reserveNetwork(net); err != nil { - klog.Errorf("Failed to reserve network %q: %v", net, err) + if _, err := lipam.networkAcquireSpecific(net.String()); err != nil { return err } } @@ -56,14 +57,14 @@ func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error { func (lipam *LiqoIPAM) initializeIPs(ctx context.Context) error { // List all IPs present in the cluster. - ips, err := listIPsOnCluster(ctx, lipam.Client) + ips, err := lipam.listIPsOnCluster(ctx, lipam.Client) if err != nil { return err } // Initialize the IPs. for _, ip := range ips { - if err := lipam.reserveIP(ip); err != nil { + if err := lipam.ipAcquireWithAddr(ip.cidr, ip.ip); err != nil { klog.Errorf("Failed to reserve IP %q (network %q): %v", ip.ip, ip.cidr, err) return err } diff --git a/pkg/ipam/ipam.go b/pkg/ipam/ipam.go index 6a928f1fce..427f4d501d 100644 --- a/pkg/ipam/ipam.go +++ b/pkg/ipam/ipam.go @@ -22,40 +22,46 @@ import ( "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "sigs.k8s.io/controller-runtime/pkg/client" + + ipamcore "github.com/liqotech/liqo/pkg/ipam/core" ) // LiqoIPAM is the struct implementing the IPAM interface. type LiqoIPAM struct { UnimplementedIPAMServer - HealthServer *health.Server - client.Client + IpamCore *ipamcore.Ipam + mutex sync.Mutex - opts *ServerOptions - cacheNetworks map[string]networkInfo - cacheIPs map[string]ipInfo - mutex sync.Mutex + HealthServer *health.Server + client.Client + opts *ServerOptions } // ServerOptions contains the options to configure the IPAM server. type ServerOptions struct { - Port int - SyncFrequency time.Duration + Pools []string + Port int + SyncFrequency time.Duration + GraphvizEnabled bool } // New creates a new instance of the LiqoIPAM. -func New(ctx context.Context, cl client.Client, opts *ServerOptions) (*LiqoIPAM, error) { +func New(ctx context.Context, cl client.Client, roots []string, opts *ServerOptions) (*LiqoIPAM, error) { hs := health.NewServer() hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING) - lipam := &LiqoIPAM{ - HealthServer: hs, + ipam, err := ipamcore.NewIpam(roots, []string{}) + if err != nil { + return nil, err + } - Client: cl, + lipam := &LiqoIPAM{ + IpamCore: ipam, - opts: opts, - cacheNetworks: make(map[string]networkInfo), - cacheIPs: make(map[string]ipInfo), + HealthServer: hs, + Client: cl, + opts: opts, } // Initialize the IPAM instance @@ -73,7 +79,7 @@ func New(ctx context.Context, cl client.Client, opts *ServerOptions) (*LiqoIPAM, // IPAcquire acquires a free IP from a given CIDR. func (lipam *LiqoIPAM) IPAcquire(_ context.Context, req *IPAcquireRequest) (*IPAcquireResponse, error) { - remappedIP, err := lipam.acquireIP(req.GetCidr()) + remappedIP, err := lipam.ipAcquire(req.GetCidr()) if err != nil { return &IPAcquireResponse{}, err } @@ -83,16 +89,35 @@ func (lipam *LiqoIPAM) IPAcquire(_ context.Context, req *IPAcquireRequest) (*IPA // IPRelease releases an IP from a given CIDR. func (lipam *LiqoIPAM) IPRelease(_ context.Context, req *IPReleaseRequest) (*IPReleaseResponse, error) { - lipam.freeIP(ipCidr{ip: req.GetIp(), cidr: req.GetCidr()}) + if err := lipam.ipRelease(ipCidr{ip: req.GetIp(), cidr: req.GetCidr()}); err != nil { + return &IPReleaseResponse{}, err + } return &IPReleaseResponse{}, nil } // NetworkAcquire acquires a network. If it is already reserved, it allocates and reserves a new free one with the same prefix length. func (lipam *LiqoIPAM) NetworkAcquire(_ context.Context, req *NetworkAcquireRequest) (*NetworkAcquireResponse, error) { - remappedCidr, err := lipam.acquireNetwork(req.GetCidr(), uint(req.GetPreAllocated()), req.GetImmutable()) - if err != nil { - return &NetworkAcquireResponse{}, err + var remappedCidr string + var err error + + if req.GetImmutable() { + remappedCidr, err = lipam.networkAcquireSpecific(req.GetCidr()) + if err != nil { + return &NetworkAcquireResponse{}, err + } + } else { + remappedCidr, err = lipam.networkAcquire(req.GetCidr()) + if err != nil { + return &NetworkAcquireResponse{}, err + } + } + + for i := 0; i < int(req.GetPreAllocated()); i++ { + _, err := lipam.ipAcquire(remappedCidr) + if err != nil { + return &NetworkAcquireResponse{}, err + } } return &NetworkAcquireResponse{Cidr: remappedCidr}, nil @@ -100,14 +125,19 @@ func (lipam *LiqoIPAM) NetworkAcquire(_ context.Context, req *NetworkAcquireRequ // NetworkRelease releases a network. func (lipam *LiqoIPAM) NetworkRelease(_ context.Context, req *NetworkReleaseRequest) (*NetworkReleaseResponse, error) { - lipam.freeNetwork(network{cidr: req.GetCidr()}) + if err := lipam.networkRelease(req.GetCidr()); err != nil { + return &NetworkReleaseResponse{}, err + } return &NetworkReleaseResponse{}, nil } // NetworkIsAvailable checks if a network is available. func (lipam *LiqoIPAM) NetworkIsAvailable(_ context.Context, req *NetworkAvailableRequest) (*NetworkAvailableResponse, error) { - available := lipam.isNetworkAvailable(network{cidr: req.GetCidr()}) + available, err := lipam.networkIsAvailable(req.GetCidr()) + if err != nil { + return &NetworkAvailableResponse{}, err + } return &NetworkAvailableResponse{Available: available}, nil } diff --git a/pkg/ipam/ips.go b/pkg/ipam/ips.go index 5784cb7ef9..0d2bc01c41 100644 --- a/pkg/ipam/ips.go +++ b/pkg/ipam/ips.go @@ -16,20 +16,15 @@ package ipam import ( "context" - "time" + "fmt" + "net/netip" - "github.com/google/nftables" klog "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" ) -type ipInfo struct { - ipCidr - creationTimestamp time.Time -} - type ipCidr struct { ip string cidr string @@ -39,59 +34,105 @@ func (i ipCidr) String() string { return i.ip + "-" + i.cidr } -// reserveIP reserves an IP, saving it in the cache. -func (lipam *LiqoIPAM) reserveIP(ip ipCidr) error { +// ipAcquire acquires an IP, eventually remapped if conflicts are found. +func (lipam *LiqoIPAM) ipAcquire(cidr string) (string, error) { lipam.mutex.Lock() defer lipam.mutex.Unlock() - if lipam.cacheIPs == nil { - lipam.cacheIPs = make(map[string]ipInfo) + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return "", fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) } - lipam.cacheIPs[ip.String()] = ipInfo{ - ipCidr: ip, - creationTimestamp: time.Now(), + + result := lipam.IpamCore.IPAcquire(prefix) + if result == nil { + return "", fmt.Errorf("failed to reserve IP in network %q", cidr) } - klog.Infof("Reserved IP %q (network %q)", ip.ip, ip.cidr) - return nil + klog.Infof("Acquired IP %q (network %q)", result.String(), cidr) + + if lipam.opts.GraphvizEnabled { + return result.String(), lipam.IpamCore.ToGraphviz() + } + return result.String(), nil } -// acquireIP acquires an IP, eventually remapped if conflicts are found. -func (lipam *LiqoIPAM) acquireIP(cidr string) (string, error) { +// acquireIpWithAddress acquires an IP with a specific address. +func (lipam *LiqoIPAM) ipAcquireWithAddr(cidr, ip string) error { lipam.mutex.Lock() defer lipam.mutex.Unlock() - // TODO: implement real IP acquire logic - if lipam.cacheIPs == nil { - lipam.cacheIPs = make(map[string]ipInfo) - } - firstIP, _, err := nftables.NetFirstAndLastIP(cidr) + prefix, err := netip.ParsePrefix(cidr) if err != nil { - return "", err + return fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) } - ip := ipCidr{ - ip: firstIP.String(), - cidr: cidr, + + addr, err := netip.ParseAddr(ip) + if err != nil { + return fmt.Errorf("failed to parse IP %q: %w", ip, err) } - lipam.cacheIPs[ip.String()] = ipInfo{ - ipCidr: ip, - creationTimestamp: time.Now(), + + result := lipam.IpamCore.IPAcquireWithAddr(prefix, addr) + if result == nil { + return fmt.Errorf("failed to reserve IP %q in network %q", ip, cidr) } - klog.Infof("Acquired IP %q (network %q)", ip.ip, ip.cidr) - return ip.ip, nil + klog.Infof("Acquired specific IP %q (%q)", result.String(), cidr) + if lipam.opts.GraphvizEnabled { + return lipam.IpamCore.ToGraphviz() + } + return nil } -// freeIP frees an IP, removing it from the cache. -func (lipam *LiqoIPAM) freeIP(ip ipCidr) { +// ipRelease frees an IP, removing it from the cache. +func (lipam *LiqoIPAM) ipRelease(ip ipCidr) error { lipam.mutex.Lock() defer lipam.mutex.Unlock() - delete(lipam.cacheIPs, ip.String()) + prefix, err := netip.ParsePrefix(ip.cidr) + if err != nil { + return fmt.Errorf("failed to parse CIDR %q: %w", ip.cidr, err) + } + addr, err := netip.ParseAddr(ip.ip) + if err != nil { + return fmt.Errorf("failed to parse IP %q: %w", ip.ip, err) + } + + result := lipam.IpamCore.IPRelease(prefix, addr) + if result == nil { + return fmt.Errorf("failed to free IP %q (network %q)", ip.ip, ip.cidr) + } klog.Infof("Freed IP %q (network %q)", ip.ip, ip.cidr) + + if lipam.opts.GraphvizEnabled { + return lipam.IpamCore.ToGraphviz() + } + return nil +} + +// isIPAvailable checks if an IP is available. +// +//nolint:unparam // This function is used for testing purposes. +func (lipam *LiqoIPAM) isIPAvailable(cidr, ip string) (bool, error) { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return false, fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) + } + addr, err := netip.ParseAddr(ip) + if err != nil { + return false, fmt.Errorf("failed to parse IP %q: %w", ip, err) + } + + return !lipam.IpamCore.IsAllocatedIP(prefix, addr), nil } -func listIPsOnCluster(ctx context.Context, cl client.Client) ([]ipCidr, error) { +func (lipam *LiqoIPAM) listIPsOnCluster(ctx context.Context, cl client.Client) ([]ipCidr, error) { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + var ips []ipCidr var ipList ipamv1alpha1.IPList if err := cl.List(ctx, &ipList); err != nil { diff --git a/pkg/ipam/networks.go b/pkg/ipam/networks.go index f81fd592fa..b0bbe1b839 100644 --- a/pkg/ipam/networks.go +++ b/pkg/ipam/networks.go @@ -16,7 +16,8 @@ package ipam import ( "context" - "time" + "fmt" + "net/netip" klog "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -24,87 +25,93 @@ import ( ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" ) -type networkInfo struct { - network - creationTimestamp time.Time -} - -type network struct { - cidr string - preAllocated uint -} - -func (n network) String() string { - return n.cidr -} - -// reserveNetwork reserves a network, saving it in the cache. -func (lipam *LiqoIPAM) reserveNetwork(nw network) error { +// networkAcquire acquires a network, eventually remapped if conflicts are found. +func (lipam *LiqoIPAM) networkAcquire(cidr string) (string, error) { lipam.mutex.Lock() defer lipam.mutex.Unlock() - // TODO: implement real network reserve logic - if lipam.cacheNetworks == nil { - lipam.cacheNetworks = make(map[string]networkInfo) + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return "", fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) } - lipam.cacheNetworks[nw.String()] = networkInfo{ - network: nw, - creationTimestamp: time.Now(), + + result := lipam.IpamCore.NetworkAcquireWithPrefix(prefix) + if result == nil { + result = lipam.IpamCore.NetworkAcquire(prefix.Bits()) + if result == nil { + return "", fmt.Errorf("failed to reserve network %q", cidr) + } } - klog.Infof("Reserved network %q", nw) - return nil + klog.Infof("Acquired network %q -> %q", cidr, result.String()) + + if lipam.opts.GraphvizEnabled { + return result.String(), lipam.IpamCore.ToGraphviz() + } + return result.String(), nil } -// acquireNetwork acquires a network, eventually remapped if conflicts are found. -func (lipam *LiqoIPAM) acquireNetwork(cidr string, preAllocated uint, immutable bool) (string, error) { +// networkAcquireSpecific acquires a network with a specific prefix. +// If the network is already allocated, it returns an error. +func (lipam *LiqoIPAM) networkAcquireSpecific(cidr string) (string, error) { lipam.mutex.Lock() defer lipam.mutex.Unlock() - // TODO: implement real network acquire logic - _ = immutable - if lipam.cacheNetworks == nil { - lipam.cacheNetworks = make(map[string]networkInfo) - } - nw := network{ - cidr: cidr, - preAllocated: preAllocated, + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return "", fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) } - lipam.cacheNetworks[nw.String()] = networkInfo{ - network: nw, - creationTimestamp: time.Now(), + + result := lipam.IpamCore.NetworkAcquireWithPrefix(prefix) + if result == nil { + return "", fmt.Errorf("failed to reserve specific network %q", cidr) } - klog.Infof("Acquired network %q", nw.cidr) - return nw.cidr, nil + klog.Infof("Acquired specific network %q -> %q", cidr, result.String()) + + if lipam.opts.GraphvizEnabled { + return result.String(), lipam.IpamCore.ToGraphviz() + } + return result.String(), nil } -// freeNetwork frees a network, removing it from the cache. -func (lipam *LiqoIPAM) freeNetwork(nw network) { +// networkRelease frees a network, removing it from the cache. +func (lipam *LiqoIPAM) networkRelease(cidr string) error { lipam.mutex.Lock() defer lipam.mutex.Unlock() - // TODO: implement real network free logic - delete(lipam.cacheNetworks, nw.String()) - klog.Infof("Freed network %q", nw.cidr) + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) + } + + result := lipam.IpamCore.NetworkRelease(prefix) + if result == nil { + return fmt.Errorf("failed to free network %q", cidr) + } + klog.Infof("Freed network %q", cidr) + + if lipam.opts.GraphvizEnabled { + return lipam.IpamCore.ToGraphviz() + } + return nil } -// isNetworkAvailable checks if a network is available. -func (lipam *LiqoIPAM) isNetworkAvailable(nw network) bool { +// networkIsAvailable checks if a network is available. +func (lipam *LiqoIPAM) networkIsAvailable(cidr string) (bool, error) { lipam.mutex.Lock() defer lipam.mutex.Unlock() - // TODO: implement real network availability check logic - if lipam.cacheNetworks == nil { - return true + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return false, fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) } - _, ok := lipam.cacheNetworks[nw.String()] - return ok + return !lipam.IpamCore.NetworkIsAvailable(prefix), nil } -func listNetworksOnCluster(ctx context.Context, cl client.Client) ([]network, error) { - var nets []network +func (lipam *LiqoIPAM) listNetworksOnCluster(ctx context.Context, cl client.Client) ([]netip.Prefix, error) { + var nets []netip.Prefix var networks ipamv1alpha1.NetworkList if err := cl.List(ctx, &networks); err != nil { return nil, err @@ -115,14 +122,15 @@ func listNetworksOnCluster(ctx context.Context, cl client.Client) ([]network, er cidr := net.Status.CIDR.String() if cidr == "" { - klog.Warningf("Network %q has no CIDR", net.Name) continue } - nets = append(nets, network{ - cidr: cidr, - preAllocated: net.Spec.PreAllocated, - }) + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return nil, fmt.Errorf("failed to parse CIDR %q: %w", cidr, err) + } + + nets = append(nets, prefix) } return nets, nil diff --git a/pkg/ipam/sync.go b/pkg/ipam/sync.go index 38de0a74d6..78f4655075 100644 --- a/pkg/ipam/sync.go +++ b/pkg/ipam/sync.go @@ -16,31 +16,37 @@ package ipam import ( "context" + "fmt" + "net/netip" "os" "time" "k8s.io/apimachinery/pkg/util/wait" klog "k8s.io/klog/v2" + + "github.com/liqotech/liqo/pkg/utils/maps" ) // +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch // +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch func (lipam *LiqoIPAM) sync(ctx context.Context, syncFrequency time.Duration) { + if syncFrequency == 0 { + klog.Info("IPAM cache sync routine disabled") + return + } + err := wait.PollUntilContextCancel(ctx, syncFrequency, false, func(ctx context.Context) (done bool, err error) { klog.Info("Started IPAM cache sync routine") - now := time.Now() - // networks created before this threshold will be removed from the cache if they are not present in the cluster. - expiredThreshold := now.Add(-syncFrequency) // Sync networks. - if err := lipam.syncNetworks(ctx, expiredThreshold); err != nil { + if err := lipam.syncNetworks(ctx); err != nil { return false, err } // Sync IPs. - if err := lipam.syncIPs(ctx, expiredThreshold); err != nil { + if err := lipam.syncIPs(ctx); err != nil { return false, err } @@ -53,61 +59,110 @@ func (lipam *LiqoIPAM) sync(ctx context.Context, syncFrequency time.Duration) { } } -func (lipam *LiqoIPAM) syncNetworks(ctx context.Context, expiredThreshold time.Time) error { +func syncNetworkAcquire(lipam *LiqoIPAM, clusterNetworks, currentNetworks map[netip.Prefix]any) error { + // Add networks that are present in the cluster but not in the cache. + for clusterNetwork := range clusterNetworks { + if _, ok := currentNetworks[clusterNetwork]; !ok { + if _, err := lipam.networkAcquireSpecific(clusterNetwork.String()); err != nil { + return fmt.Errorf("failed to acquire network %q: %w", clusterNetworks[clusterNetwork], err) + } + } + } + return nil +} + +func isNetworkInCluster(clusterNetworks map[netip.Prefix]any, currentNetwork netip.Prefix) bool { + _, ok := clusterNetworks[currentNetwork] + return ok +} + +func syncNetworkFree(lipam *LiqoIPAM, clusterNetworks, currentNetworks map[netip.Prefix]any) error { + // Remove networks that are present in the cache but not in the cluster, and were added before the threshold. + for currentNetwork := range currentNetworks { + if !isNetworkInCluster(clusterNetworks, currentNetwork) { + if err := lipam.networkRelease(currentNetwork.String()); err != nil { + return fmt.Errorf("failed to free network %q: %w", currentNetwork.String(), err) + } + } + } + return nil +} + +func (lipam *LiqoIPAM) syncNetworks(ctx context.Context) error { // List all networks present in the cluster. - clusterNetworks, err := listNetworksOnCluster(ctx, lipam.Client) + clusterNetworksSlice, err := lipam.listNetworksOnCluster(ctx, lipam.Client) if err != nil { return err } - // Create the set of networks present in the cluster (for faster lookup later). - setClusterNetworks := make(map[string]struct{}) + clusterNetworksMap := maps.SliceToMap(clusterNetworksSlice) - // Add networks that are present in the cluster but not in the cache. - for _, net := range clusterNetworks { - if _, inCache := lipam.cacheNetworks[net.String()]; !inCache { - if err := lipam.reserveNetwork(net); err != nil { - return err - } + currentNetworksMap := maps.SliceToMap(lipam.IpamCore.ListNetworks()) + + if err := syncNetworkAcquire(lipam, clusterNetworksMap, currentNetworksMap); err != nil { + return fmt.Errorf("failed to acquire network: %w", err) + } + + if err := syncNetworkFree(lipam, clusterNetworksMap, currentNetworksMap); err != nil { + return fmt.Errorf("failed to free network: %w", err) + } + + return nil +} + +func syncIPsAcquire(lipam *LiqoIPAM, clusterIPs []ipCidr) { + for i := range clusterIPs { + _ = lipam.ipAcquireWithAddr(clusterIPs[i].cidr, clusterIPs[i].ip) + } +} + +func isIPInCluster(clusterIPs []ipCidr, currentIP ipCidr) bool { + for i := range clusterIPs { + if currentIP.ip == clusterIPs[i].ip && currentIP.cidr == clusterIPs[i].cidr { + return true } - setClusterNetworks[net.String()] = struct{}{} // add network to the set } + return false +} - // Remove networks that are present in the cache but not in the cluster, and were added before the threshold. - for key := range lipam.cacheNetworks { - if _, inCluster := setClusterNetworks[key]; !inCluster && lipam.cacheNetworks[key].creationTimestamp.Before(expiredThreshold) { - lipam.freeNetwork(lipam.cacheNetworks[key].network) +func syncIPsFreeWithNetwork(lipam *LiqoIPAM, clusterIPs []ipCidr, currentIPs []netip.Addr, currentIPNetwork netip.Prefix) error { + for i := range currentIPs { + currentIP := currentIPs[i] + if !isIPInCluster(clusterIPs, ipCidr{ip: currentIP.String(), cidr: currentIPNetwork.String()}) { + if err := lipam.ipRelease(ipCidr{ip: currentIPs[i].String(), cidr: currentIPNetwork.String()}); err != nil { + return fmt.Errorf("failed to free IP %q in network %q: %w", currentIPs[i].String(), currentIPNetwork.String(), err) + } } } + return nil +} +func syncIPsFree(lipam *LiqoIPAM, clusterIPs []ipCidr, currentIPs map[netip.Prefix][]netip.Addr) error { + for currentIPsNetwork := range currentIPs { + if err := syncIPsFreeWithNetwork(lipam, clusterIPs, currentIPs[currentIPsNetwork], currentIPsNetwork); err != nil { + return fmt.Errorf("failed to free IP: %w", err) + } + } return nil } -func (lipam *LiqoIPAM) syncIPs(ctx context.Context, expiredThreshold time.Time) error { +func (lipam *LiqoIPAM) syncIPs(ctx context.Context) error { // List all IPs present in the cluster. - ips, err := listIPsOnCluster(ctx, lipam.Client) + clusterIPsList, err := lipam.listIPsOnCluster(ctx, lipam.Client) if err != nil { return err } - // Create the set of IPs present in the cluster (for faster lookup later). - setClusterIPs := make(map[string]struct{}) - - // Add IPs that are present in the cluster but not in the cache. - for _, ip := range ips { - if _, inCache := lipam.cacheIPs[ip.String()]; !inCache { - if err := lipam.reserveIP(ip); err != nil { - return err - } - } - setClusterIPs[ip.String()] = struct{}{} // add IP to the set + currentIPsMap := make(map[netip.Prefix][]netip.Addr) + currentNetworks := lipam.IpamCore.ListNetworks() + for i := range currentNetworks { + currentIPsMap[currentNetworks[i]] = lipam.IpamCore.ListIPs(currentNetworks[i]) } - // Remove IPs that are present in the cache but not in the cluster, and were added before the threshold. - for key := range lipam.cacheIPs { - if _, inCluster := setClusterIPs[key]; !inCluster && lipam.cacheIPs[key].creationTimestamp.Before(expiredThreshold) { - lipam.freeIP(lipam.cacheIPs[key].ipCidr) - } + syncIPsAcquire(lipam, clusterIPsList) + + if err := syncIPsFree(lipam, clusterIPsList, currentIPsMap); err != nil { + return fmt.Errorf("failed to free IP: %w", err) } return nil diff --git a/pkg/ipam/sync_test.go b/pkg/ipam/sync_test.go index 374efd7bac..690e1a0935 100644 --- a/pkg/ipam/sync_test.go +++ b/pkg/ipam/sync_test.go @@ -16,7 +16,6 @@ package ipam import ( "context" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -26,40 +25,21 @@ import ( ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" networkingv1beta1 "github.com/liqotech/liqo/apis/networking/v1beta1" + ipamcore "github.com/liqotech/liqo/pkg/ipam/core" ) var _ = Describe("Sync routine tests", func() { const ( - syncFrequency = 3 * time.Second + syncFrequency = 0 testNamespace = "test" ) var ( ctx context.Context fakeClientBuilder *fake.ClientBuilder - now time.Time - newEntryThreshold time.Time - notNewTimestamp time.Time fakeIpamServer *LiqoIPAM - addNetowrkToCache = func(ipamServer *LiqoIPAM, cidr string, creationTimestamp time.Time) { - ipamServer.cacheNetworks[cidr] = networkInfo{ - network: network{ - cidr: cidr, - }, - creationTimestamp: creationTimestamp, - } - } - - addIPToCache = func(ipamServer *LiqoIPAM, ip, cidr string, creationTimestamp time.Time) { - ipC := ipCidr{ip: ip, cidr: cidr} - ipamServer.cacheIPs[ipC.String()] = ipInfo{ - ipCidr: ipC, - creationTimestamp: creationTimestamp, - } - } - newNetwork = func(name, cidr string) *ipamv1alpha1.Network { return &ipamv1alpha1.Network{ ObjectMeta: metav1.ObjectMeta{ @@ -90,14 +70,20 @@ var _ = Describe("Sync routine tests", func() { }, } } + + addNetwork = func(server *LiqoIPAM, cidr string) { + _, err := server.networkAcquireSpecific(cidr) + Expect(err).ShouldNot(HaveOccurred()) + } + + addIP = func(server *LiqoIPAM, ip, cidr string) { + Expect(server.ipAcquireWithAddr(cidr, ip)).Should(Succeed()) + } ) BeforeEach(func() { ctx = context.Background() fakeClientBuilder = fake.NewClientBuilder().WithScheme(scheme.Scheme) - now = time.Now() - newEntryThreshold = now.Add(-syncFrequency) - notNewTimestamp = newEntryThreshold.Add(-time.Minute) }) Describe("Testing the sync routine", func() { @@ -110,27 +96,34 @@ var _ = Describe("Sync routine tests", func() { newNetwork("net3", "10.2.0.0/16"), ).Build() + ipamCore, err := ipamcore.NewIpam([]string{"10.0.0.0/8"}, []string{}) + Expect(err).To(BeNil()) + // Populate the cache fakeIpamServer = &LiqoIPAM{ - Client: client, - cacheNetworks: make(map[string]networkInfo), + Client: client, + IpamCore: ipamCore, + opts: &ServerOptions{ + SyncFrequency: syncFrequency, + GraphvizEnabled: false, + }, } - addNetowrkToCache(fakeIpamServer, "10.0.0.0/16", now) - addNetowrkToCache(fakeIpamServer, "10.1.0.0/16", notNewTimestamp) - addNetowrkToCache(fakeIpamServer, "10.3.0.0/16", notNewTimestamp) - addNetowrkToCache(fakeIpamServer, "10.4.0.0/16", now) + addNetwork(fakeIpamServer, "10.0.0.0/16") + addNetwork(fakeIpamServer, "10.1.0.0/16") + addNetwork(fakeIpamServer, "10.3.0.0/16") + addNetwork(fakeIpamServer, "10.4.0.0/16") }) It("should remove networks from cache if they are not present in the cluster", func() { // Run sync - Expect(fakeIpamServer.syncNetworks(ctx, newEntryThreshold)).To(Succeed()) + Expect(fakeIpamServer.syncNetworks(ctx)).To(Succeed()) // Check the cache - Expect(fakeIpamServer.cacheNetworks).To(HaveKey("10.0.0.0/16")) // network in cluster and cache - Expect(fakeIpamServer.cacheNetworks).To(HaveKey("10.1.0.0/16")) // network in cluster and cache before new entry threshold - Expect(fakeIpamServer.cacheNetworks).To(HaveKey("10.2.0.0/16")) // network in cluster but not in cache - Expect(fakeIpamServer.cacheNetworks).NotTo(HaveKey("10.3.0.0/16")) // network not in cluster but in cache before new entry threshold - Expect(fakeIpamServer.cacheNetworks).To(HaveKey("10.4.0.0/16")) // network not in cluster but in cache after new entry threshold + Expect(fakeIpamServer.networkIsAvailable("10.0.0.0/16")).To(Equal(false)) + Expect(fakeIpamServer.networkIsAvailable("10.1.0.0/16")).To(Equal(false)) + Expect(fakeIpamServer.networkIsAvailable("10.2.0.0/16")).To(Equal(false)) + Expect(fakeIpamServer.networkIsAvailable("10.3.0.0/16")).To(Equal(true)) + Expect(fakeIpamServer.networkIsAvailable("10.4.0.0/16")).To(Equal(true)) }) }) @@ -138,37 +131,44 @@ var _ = Describe("Sync routine tests", func() { BeforeEach(func() { // Add in-cluster IPs client := fakeClientBuilder.WithObjects( + newNetwork("net1", "10.0.0.0/24"), + newIP("ip1", "10.0.0.0", "10.0.0.0/24"), newIP("ip2", "10.0.0.1", "10.0.0.0/24"), newIP("ip3", "10.0.0.2", "10.0.0.0/24"), ).Build() + ipamCore, err := ipamcore.NewIpam([]string{"10.0.0.0/8"}, []string{}) + Expect(err).To(BeNil()) + // Populate the cache fakeIpamServer = &LiqoIPAM{ Client: client, - cacheIPs: make(map[string]ipInfo), + IpamCore: ipamCore, + opts: &ServerOptions{ + SyncFrequency: syncFrequency, + GraphvizEnabled: false, + }, } - addIPToCache(fakeIpamServer, "10.0.0.0", "10.0.0.0/24", now) - addIPToCache(fakeIpamServer, "10.0.0.1", "10.0.0.0/24", notNewTimestamp) - addIPToCache(fakeIpamServer, "10.0.0.3", "10.0.0.0/24", notNewTimestamp) - addIPToCache(fakeIpamServer, "10.0.0.4", "10.0.0.0/24", now) + + addNetwork(fakeIpamServer, "10.0.0.0/24") + + addIP(fakeIpamServer, "10.0.0.0", "10.0.0.0/24") + addIP(fakeIpamServer, "10.0.0.1", "10.0.0.0/24") + addIP(fakeIpamServer, "10.0.0.3", "10.0.0.0/24") + addIP(fakeIpamServer, "10.0.0.4", "10.0.0.0/24") }) It("should remove IPs from cache if they are not present in the cluster", func() { // Run sync - Expect(fakeIpamServer.syncIPs(ctx, newEntryThreshold)).To(Succeed()) + Expect(fakeIpamServer.syncIPs(ctx)).To(Succeed()) // Check the cache - Expect(fakeIpamServer.cacheIPs).To(HaveKey( - ipCidr{ip: "10.0.0.0", cidr: "10.0.0.0/24"}.String())) // IP in cluster and cache - Expect(fakeIpamServer.cacheIPs).To(HaveKey( - ipCidr{ip: "10.0.0.1", cidr: "10.0.0.0/24"}.String())) // IP in cluster and cache before new entry threshold - Expect(fakeIpamServer.cacheIPs).To(HaveKey( - ipCidr{ip: "10.0.0.2", cidr: "10.0.0.0/24"}.String())) // IP in cluster but not in cache - Expect(fakeIpamServer.cacheIPs).NotTo(HaveKey( - ipCidr{ip: "10.0.0.3", cidr: "10.0.0.0/24"}.String())) // IP not in cluster but in cache before new entry threshold - Expect(fakeIpamServer.cacheIPs).To(HaveKey( - ipCidr{ip: "10.0.0.4", cidr: "10.0.0.0/24"}.String())) // IP not in cluster but in cache after new entry threshold + Expect(fakeIpamServer.isIPAvailable("10.0.0.0/24", "10.0.0.0")).To(Equal(false)) + Expect(fakeIpamServer.isIPAvailable("10.0.0.0/24", "10.0.0.1")).To(Equal(false)) + Expect(fakeIpamServer.isIPAvailable("10.0.0.0/24", "10.0.0.2")).To(Equal(false)) + Expect(fakeIpamServer.isIPAvailable("10.0.0.0/24", "10.0.0.3")).To(Equal(true)) + Expect(fakeIpamServer.isIPAvailable("10.0.0.0/24", "10.0.0.4")).To(Equal(true)) }) }) }) diff --git a/pkg/liqo-controller-manager/networking/external-network/remapping/ip_controller.go b/pkg/liqo-controller-manager/networking/external-network/remapping/ip_controller.go index b776634e54..385fe01350 100644 --- a/pkg/liqo-controller-manager/networking/external-network/remapping/ip_controller.go +++ b/pkg/liqo-controller-manager/networking/external-network/remapping/ip_controller.go @@ -66,14 +66,8 @@ func (r *IPReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re deleting := !ip.DeletionTimestamp.IsZero() containsFinalizer := controllerutil.ContainsFinalizer(ip, ipMappingControllerFinalizer) - if !deleting { - if !containsFinalizer { - if err := r.ensureIPMappingFinalizerPresence(ctx, ip); err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - } - } else { + + if deleting { if containsFinalizer { if err := DeleteNatMappingIP(ctx, r.Client, ip); err != nil { return ctrl.Result{}, fmt.Errorf("unable to delete the NAT mapping for the IP %q: %w", req.NamespacedName, err) @@ -91,6 +85,13 @@ func (r *IPReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re return ctrl.Result{}, nil } + if !containsFinalizer { + if err := r.ensureIPMappingFinalizerPresence(ctx, ip); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + if err := CreateOrUpdateNatMappingIP(ctx, r.Client, ip); err != nil { return ctrl.Result{}, fmt.Errorf("unable to create or update the NAT mapping for the IP %q: %w", req.NamespacedName, err) } diff --git a/pkg/liqo-controller-manager/networking/network-controller/network_controller.go b/pkg/liqo-controller-manager/networking/network-controller/network_controller.go index 45726b0497..76a081ba90 100644 --- a/pkg/liqo-controller-manager/networking/network-controller/network_controller.go +++ b/pkg/liqo-controller-manager/networking/network-controller/network_controller.go @@ -123,9 +123,10 @@ func (r *NetworkReconciler) handleNetworkStatus(ctx context.Context, nw *ipamv1a // multiple times by checking if the status is already set. if nw.Status.CIDR == "" { desiredCIDR := nw.Spec.CIDR + preallocated := nw.Spec.PreAllocated // if the Network must not be remapped, we acquire the network specifying to the IPAM that the cidr is immutable. immutable := ipamutils.NetworkNotRemapped(nw) - remappedCIDR, err := getRemappedCIDR(ctx, r.ipamClient, desiredCIDR, immutable) + remappedCIDR, err := getRemappedCIDR(ctx, r.ipamClient, desiredCIDR, immutable, uint32(preallocated)) if err != nil { return err } diff --git a/pkg/liqo-controller-manager/networking/network-controller/utils.go b/pkg/liqo-controller-manager/networking/network-controller/utils.go index 9d9fc2c57c..4706b22f06 100644 --- a/pkg/liqo-controller-manager/networking/network-controller/utils.go +++ b/pkg/liqo-controller-manager/networking/network-controller/utils.go @@ -25,7 +25,7 @@ import ( // getRemappedCIDR returns the remapped CIDR for the given CIDR. func getRemappedCIDR(ctx context.Context, ipamClient ipam.IPAMClient, - desiredCIDR networkingv1beta1.CIDR, immutable bool) (networkingv1beta1.CIDR, error) { + desiredCIDR networkingv1beta1.CIDR, immutable bool, preallocated uint32) (networkingv1beta1.CIDR, error) { switch ipamClient.(type) { case nil: // IPAM is not enabled, use original CIDR from spec @@ -33,8 +33,9 @@ func getRemappedCIDR(ctx context.Context, ipamClient ipam.IPAMClient, default: // interact with the IPAM to retrieve the correct mapping. response, err := ipamClient.NetworkAcquire(ctx, &ipam.NetworkAcquireRequest{ - Cidr: desiredCIDR.String(), - Immutable: immutable, + Cidr: desiredCIDR.String(), + Immutable: immutable, + PreAllocated: preallocated, }) if err != nil { klog.Errorf("IPAM: error while mapping network CIDR %s: %v", desiredCIDR, err) diff --git a/pkg/utils/maps/maps.go b/pkg/utils/maps/maps.go index 55a01b6853..1b4e6e2b45 100644 --- a/pkg/utils/maps/maps.go +++ b/pkg/utils/maps/maps.go @@ -216,3 +216,12 @@ func GetNestedField(m map[string]any, path string) (any, error) { } return current, nil } + +// SliceToMap takes a slice of a generic type and returns a map where the keys are the array elements. +func SliceToMap[T comparable](slice []T) map[T]any { + result := make(map[T]any) + for _, elem := range slice { + result[elem] = nil + } + return result +}