Skip to content

Commit

Permalink
feat: ipam timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Dec 2, 2024
1 parent b38a260 commit dd9ebae
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 50 deletions.
4 changes: 3 additions & 1 deletion cmd/ipam/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ func main() {

// Server options.
cmd.Flags().IntVar(&options.ServerOpts.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.")
cmd.Flags().DurationVar(&options.ServerOpts.SyncFrequency, "sync-interval", consts.SyncInterval,
cmd.Flags().DurationVar(&options.ServerOpts.SyncInterval, "sync-interval", consts.SyncInterval,
"The interval at which the IPAM will synchronize the IPAM storage.")
cmd.Flags().DurationVar(&options.ServerOpts.SyncGracePeriod, "sync-graceperiod", consts.SyncGracePeriod,
"The grace period the sync routine wait before releasing an ip or a network.")
cmd.Flags().BoolVar(&options.ServerOpts.GraphvizEnabled, "enable-graphviz", false, "Enable the graphviz output for the IPAM.")
cmd.Flags().StringSliceVar(&options.ServerOpts.Pools, "pools",
[]string{"10.0.0.0/8", "192.168.0.0/16", "172.16.0.0/12"}, "The pools used by the IPAM.",
Expand Down
1 change: 1 addition & 0 deletions deployments/liqo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
| ipam.internal.pod.priorityClassName | string | `""` | PriorityClassName (https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#pod-priority) for the IPAM pod. |
| ipam.internal.pod.resources | object | `{"limits":{},"requests":{}}` | Resource requests and limits (https://kubernetes.io/docs/user-guide/compute-resources/) for the IPAM pod. |
| ipam.internal.replicas | int | `1` | The number of IPAM instances to run, which can be increased for active/passive high availability. |
| ipam.internal.syncGracePeriod | string | `"30s"` | |
| ipam.internal.syncInterval | string | `"2m"` | Set the interval at which the IPAM pod will synchronize it's in-memory status with the local cluster. If you want to disable the synchronization, set the interval to 0. |
| ipam.internalCIDR | string | `"10.80.0.0/16"` | The subnet used for the internal CIDR. These IPs are assigned to the Liqo internal-network interfaces. |
| ipam.podCIDR | string | `""` | The subnet used by the pods in your cluster, in CIDR notation (e.g., 10.0.0.0/16). |
Expand Down
1 change: 1 addition & 0 deletions deployments/liqo/templates/liqo-ipam-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ spec:
- --pod-name=$(POD_NAME)
- --port=6000
- --sync-interval={{ .Values.ipam.internal.syncInterval }}
- --sync-graceperiod={{ .Values.ipam.internal.syncGracePeriod }}
{{- if $ha }}
- --leader-election
- --leader-election-namespace=$(POD_NAMESPACE)
Expand Down
2 changes: 2 additions & 0 deletions deployments/liqo/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ ipam:
# -- Set the interval at which the IPAM pod will synchronize it's in-memory status with the local cluster.
# If you want to disable the synchronization, set the interval to 0.
syncInterval: 2m
## -- Set the grace period the sync routine will wait before deleting an ip or a network.
syncGracePeriod: 30s
# -- The subnet used by the pods in your cluster, in CIDR notation (e.g., 10.0.0.0/16).
podCIDR: ""
# -- The subnet used by the services in you cluster, in CIDR notation (e.g., 172.16.0.0/16).
Expand Down
3 changes: 2 additions & 1 deletion pkg/consts/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
IpamPort = 6000
// SyncInterval is the frequency at which the IPAM should periodically sync its status.
SyncInterval = 2 * time.Minute

// SyncGracePeriod is the time the IPAM sync routine should wait before performing a deletion.
SyncGracePeriod = 30 * time.Second
// NetworkNotRemappedLabelKey is the label key used to mark a Network that does not need CIDR remapping.
NetworkNotRemappedLabelKey = "ipam.liqo.io/network-not-remapped"
// NetworkNotRemappedLabelValue is the label value used to mark a Network that does not need CIDR remapping.
Expand Down
16 changes: 10 additions & 6 deletions pkg/ipam/core/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package ipamcore
import (
"fmt"
"net/netip"
"slices"
"time"
)

// Ipam represents the IPAM core structure.
Expand Down Expand Up @@ -72,10 +72,10 @@ func (ipam *Ipam) NetworkAcquireWithPrefix(prefix netip.Prefix) *netip.Prefix {

// NetworkRelease frees the network with the given prefix.
// It returns the freed network or nil if the network is not found.
func (ipam *Ipam) NetworkRelease(prefix netip.Prefix) *netip.Prefix {
func (ipam *Ipam) NetworkRelease(prefix netip.Prefix, gracePeriod time.Duration) *netip.Prefix {
for i := range ipam.roots {
if isPrefixChildOf(ipam.roots[i].prefix, prefix) {
if result := networkRelease(prefix, &ipam.roots[i]); result != nil {
if result := networkRelease(prefix, &ipam.roots[i], gracePeriod); result != nil {
return result
}
}
Expand Down Expand Up @@ -137,13 +137,13 @@ func (ipam *Ipam) IPAcquireWithAddr(prefix netip.Prefix, addr netip.Addr) (*neti

// IPRelease frees the IP address from the given prefix.
// It returns the freed IP address or nil if the IP address is not found.
func (ipam *Ipam) IPRelease(prefix netip.Prefix, addr netip.Addr) (*netip.Addr, error) {
func (ipam *Ipam) IPRelease(prefix netip.Prefix, addr netip.Addr, gracePeriod time.Duration) (*netip.Addr, error) {
node, err := ipam.search(prefix)
if err != nil {
return nil, err
}
if node != nil {
return node.ipRelease(addr), nil
return node.ipRelease(addr, gracePeriod), nil
}
return nil, nil
}
Expand All @@ -155,7 +155,11 @@ func (ipam *Ipam) ListIPs(prefix netip.Prefix) ([]netip.Addr, error) {
return nil, err
}
if node != nil {
return slices.Clone(node.ips), nil
addrs := make([]netip.Addr, len(node.ips))
for i := range node.ips {
addrs[i] = node.ips[i].addr
}
return addrs, nil
}
return nil, nil
}
Expand Down
75 changes: 51 additions & 24 deletions pkg/ipam/core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,25 @@ import (
"os"
"path/filepath"
"strings"
"time"
)

// nodeIP represents an IP address acquired by a node.
type nodeIP struct {
addr netip.Addr
creationTimestamp time.Time
}

// node represents a node in the binary tree.
type node struct {
lastUpdate time.Time

prefix netip.Prefix
acquired bool
left *node
right *node

ips []netip.Addr
ips []nodeIP
lastip netip.Addr
}

Expand All @@ -42,7 +51,7 @@ const (
)

func newNode(prefix netip.Prefix) node {
return node{prefix: prefix}
return node{prefix: prefix, lastUpdate: time.Now()}
}

func allocateNetwork(size int, node *node) *netip.Prefix {
Expand All @@ -52,6 +61,7 @@ func allocateNetwork(size int, node *node) *netip.Prefix {
if node.prefix.Bits() == size {
if !node.isSplitted() {
node.acquired = true
node.lastUpdate = time.Now()
return &node.prefix
}
return nil
Expand All @@ -76,6 +86,7 @@ func allocateNetworkWithPrefix(prefix netip.Prefix, node *node) *netip.Prefix {
if node.prefix.Addr().Compare(prefix.Addr()) == 0 && node.prefix.Bits() == prefix.Bits() {
if !node.acquired && node.left == nil && node.right == nil {
node.acquired = true
node.lastUpdate = time.Now()
return &node.prefix
}
return nil
Expand All @@ -95,29 +106,30 @@ func allocateNetworkWithPrefix(prefix netip.Prefix, node *node) *netip.Prefix {
return nil
}

func networkRelease(prefix netip.Prefix, node *node) *netip.Prefix {
func networkRelease(prefix netip.Prefix, node *node, gracePeriod time.Duration) *netip.Prefix {
var result *netip.Prefix

if node == nil {
return nil
}

if node.prefix.Addr().Compare(prefix.Addr()) == 0 && node.prefix.Bits() == prefix.Bits() {
if node.prefix.Addr().Compare(prefix.Addr()) == 0 && node.prefix.Bits() == prefix.Bits() && node.lastUpdate.Add(gracePeriod).Before(time.Now()) {
if node.acquired {
node.acquired = false
node.lastUpdate = time.Now()
return &node.prefix
}
return nil
}

if node.left != nil && node.left.prefix.Overlaps(prefix) {
result = networkRelease(prefix, node.left)
result = networkRelease(prefix, node.left, gracePeriod)
}
if node.right != nil && node.right.prefix.Overlaps(prefix) {
result = networkRelease(prefix, node.right)
result = networkRelease(prefix, node.right, gracePeriod)
}

node.merge()
node.merge(gracePeriod)
return result
}

Expand All @@ -143,7 +155,7 @@ func listNetworks(node *node) []netip.Prefix {

func (n *node) isAllocatedIP(ip netip.Addr) bool {
for i := range n.ips {
if n.ips[i].Compare(ip) == 0 {
if n.ips[i].addr.Compare(ip) == 0 {
return true
}
}
Expand Down Expand Up @@ -175,8 +187,9 @@ func (n *node) ipAcquire() *netip.Addr {
addr = n.prefix.Addr()
}
if !n.isAllocatedIP(addr) {
n.ips = append(n.ips, addr)
n.ips = append(n.ips, nodeIP{addr: addr, creationTimestamp: time.Now()})
n.lastip = addr
n.lastUpdate = time.Now()
return &addr
}
addr = addr.Next()
Expand All @@ -194,25 +207,30 @@ func (n *node) allocateIPWithAddr(addr netip.Addr) *netip.Addr {
}

for i := range n.ips {
if n.ips[i].Compare(addr) == 0 {
if n.ips[i].addr.Compare(addr) == 0 {
return nil
}
}

n.ips = append(n.ips, addr)
n.ips = append(n.ips, nodeIP{addr: addr, creationTimestamp: time.Now()})
n.lastUpdate = time.Now()

return &n.ips[len(n.ips)-1]
return &n.ips[len(n.ips)-1].addr
}

func (n *node) ipRelease(ip netip.Addr) *netip.Addr {
func (n *node) ipRelease(ip netip.Addr, gracePeriod time.Duration) *netip.Addr {
if !n.acquired {
return nil
}

for i, addr := range n.ips {
if addr.Compare(ip) == 0 {
for i, nodeIP := range n.ips {
if !nodeIP.creationTimestamp.Add(gracePeriod).Before(time.Now()) {
continue
}
if nodeIP.addr.Compare(ip) == 0 {
n.ips = append(n.ips[:i], n.ips[i+1:]...)
return &addr
n.lastUpdate = time.Now()
return &nodeIP.addr
}
}
return nil
Expand Down Expand Up @@ -246,21 +264,30 @@ func (n *node) split() {
n.insert(rightDirection, right)
}

func (n *node) merge() {
if n.left.isLeaf() && n.right.isLeaf() && !n.left.acquired && !n.right.acquired {
n.left = nil
n.right = nil
func (n *node) merge(gracePeriod time.Duration) {
if !n.left.lastUpdate.Add(gracePeriod).Before(time.Now()) || !n.right.lastUpdate.Add(gracePeriod).Before(time.Now()) {
return // grace period not expired
}
if !n.left.isLeaf() || !n.right.isLeaf() {
return
}
if n.left.acquired || n.right.acquired {
return
}

n.left = nil
n.right = nil
n.lastUpdate = time.Now()
}

func (n *node) insert(nd nodeDirection, prefix netip.Prefix) {
newNode := &node{prefix: prefix}
newNode := newNode(prefix)
switch nd {
case leftDirection:
n.left = newNode
n.left = &newNode
return
case rightDirection:
n.right = newNode
n.right = &newNode
return
default:
return
Expand Down Expand Up @@ -333,7 +360,7 @@ func (n *node) toGraphvizRecursive(sb *strings.Builder) {
if len(n.ips) > 0 {
ipsString := []string{}
for i := range n.ips {
ipsString = append(ipsString, n.ips[i].String())
ipsString = append(ipsString, n.ips[i].addr.String())
}
label += "\\n" + strings.Join(ipsString, "\\n")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ipam/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error {
}
for i := 0; i < int(netdetails.preallocated); i++ {
if _, err := lipam.ipAcquire(net); err != nil {
return errors.Join(err, lipam.networkRelease(net))
return errors.Join(err, lipam.networkRelease(net, 0))
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/ipam/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type LiqoIPAM struct {
type ServerOptions struct {
Pools []string
Port int
SyncFrequency time.Duration
SyncInterval time.Duration
SyncGracePeriod time.Duration
GraphvizEnabled bool
}

Expand Down Expand Up @@ -74,7 +75,7 @@ func New(ctx context.Context, cl client.Client, roots []string, opts *ServerOpti
}

// Launch sync routine
go lipam.sync(ctx, opts.SyncFrequency)
go lipam.sync(ctx, opts.SyncInterval)

hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)

Expand Down Expand Up @@ -118,7 +119,7 @@ func (lipam *LiqoIPAM) IPRelease(_ context.Context, req *IPReleaseRequest) (*IPR
return &IPReleaseResponse{}, fmt.Errorf("failed to parse prefix %q: %w", req.GetCidr(), err)
}

if err := lipam.ipRelease(addr, prefix); err != nil {
if err := lipam.ipRelease(addr, prefix, 0); err != nil {
return &IPReleaseResponse{}, err
}

Expand Down Expand Up @@ -157,7 +158,7 @@ func (lipam *LiqoIPAM) NetworkAcquire(_ context.Context, req *NetworkAcquireRequ
for i := 0; i < int(req.GetPreAllocated()); i++ {
_, err := lipam.ipAcquire(*remappedCidr)
if err != nil {
return &NetworkAcquireResponse{}, errors.Join(err, lipam.networkRelease(*remappedCidr))
return &NetworkAcquireResponse{}, errors.Join(err, lipam.networkRelease(*remappedCidr, 0))
}
}

Expand All @@ -174,7 +175,7 @@ func (lipam *LiqoIPAM) NetworkRelease(_ context.Context, req *NetworkReleaseRequ
return &NetworkReleaseResponse{}, fmt.Errorf("failed to parse prefix %q: %w", req.GetCidr(), err)
}

if err := lipam.networkRelease(prefix); err != nil {
if err := lipam.networkRelease(prefix, 0); err != nil {
return &NetworkReleaseResponse{}, err
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/ipam/ips.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net/netip"
"time"

klog "k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -61,13 +62,13 @@ func (lipam *LiqoIPAM) ipAcquireWithAddr(addr netip.Addr, prefix netip.Prefix) e
}

// ipRelease frees an IP, removing it from the cache.
func (lipam *LiqoIPAM) ipRelease(addr netip.Addr, prefix netip.Prefix) error {
result, err := lipam.IpamCore.IPRelease(prefix, addr)
func (lipam *LiqoIPAM) ipRelease(addr netip.Addr, prefix netip.Prefix, gracePeriod time.Duration) error {
result, err := lipam.IpamCore.IPRelease(prefix, addr, gracePeriod)
if err != nil {
return fmt.Errorf("error freeing IP %q (network %q): %w", addr.String(), prefix.String(), err)
}
if result == nil {
klog.Infof("IP %q (network %q) already freed", addr.String(), prefix.String())
klog.Infof("IP %q (network %q) already freed or grace period not over", addr.String(), prefix.String())
return nil
}
klog.Infof("Freed IP %q (network %q)", addr.String(), prefix.String())
Expand Down
7 changes: 4 additions & 3 deletions pkg/ipam/networks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net/netip"
"time"

klog "k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -60,10 +61,10 @@ func (lipam *LiqoIPAM) networkAcquireSpecific(prefix netip.Prefix) (*netip.Prefi
}

// networkRelease frees a network, removing it from the cache.
func (lipam *LiqoIPAM) networkRelease(prefix netip.Prefix) error {
result := lipam.IpamCore.NetworkRelease(prefix)
func (lipam *LiqoIPAM) networkRelease(prefix netip.Prefix, gracePeriod time.Duration) error {
result := lipam.IpamCore.NetworkRelease(prefix, gracePeriod)
if result == nil {
klog.Infof("Network %q already freed", prefix.String())
klog.Infof("Network %q already freed or grace period not over", prefix.String())
return nil
}
klog.Infof("Freed network %q", prefix.String())
Expand Down
Loading

0 comments on commit dd9ebae

Please sign in to comment.