From 155d62913d81549bdb9c4c5e16947c7ebb7bafa1 Mon Sep 17 00:00:00 2001 From: Francesco Torta <62566275+fra98@users.noreply.github.com> Date: Wed, 30 Oct 2024 15:59:50 +0100 Subject: [PATCH] feat: ipam sync routine --- cmd/ipam/main.go | 12 ++--- pkg/consts/ipam.go | 6 ++- pkg/ipam/initialize.go | 21 +------- pkg/ipam/ipam.go | 38 ++++++++++---- pkg/ipam/ips.go | 60 +++++++++++++++++++++ pkg/ipam/networks.go | 51 ++++++++++++++++++ pkg/ipam/sync.go | 116 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 266 insertions(+), 38 deletions(-) create mode 100644 pkg/ipam/ips.go create mode 100644 pkg/ipam/networks.go create mode 100644 pkg/ipam/sync.go diff --git a/cmd/ipam/main.go b/cmd/ipam/main.go index 76d46a2cc4..39c01acf8d 100644 --- a/cmd/ipam/main.go +++ b/cmd/ipam/main.go @@ -22,7 +22,6 @@ import ( "github.com/spf13/cobra" "google.golang.org/grpc" - "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -71,6 +70,8 @@ func main() { restcfg.InitFlags(cmd.Flags()) cmd.Flags().IntVar(&options.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.") + cmd.Flags().DurationVar(&options.SyncFrequency, "interval", consts.SyncFrequency, + "The interval at which the IPAM will synchronize the IPAM storage.") cmd.Flags().BoolVar(&options.EnableLeaderElection, "leader-election", false, "Enable leader election for IPAM. "+ "Enabling this will ensure there is only one active IPAM.") cmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", "liqo", @@ -102,12 +103,10 @@ func run(cmd *cobra.Command, _ []string) error { // Get the rest config. cfg := restcfg.SetRateLimiter(ctrl.GetConfigOrDie()) - options.Config = cfg cl, err := client.New(cfg, client.Options{}) if err != nil { return err } - options.Client = cl if options.EnableLeaderElection { if leader, err := leaderelection.Blocking(ctx, cfg, record.NewBroadcaster(), &leaderelection.Opts{ @@ -127,10 +126,7 @@ func run(cmd *cobra.Command, _ []string) error { } } - hs := health.NewServer() - options.HealthServer = hs - - liqoIPAM, err := ipam.New(ctx, &options) + liqoIPAM, err := ipam.New(ctx, cl, cfg, &options) if err != nil { return err } @@ -143,7 +139,7 @@ func run(cmd *cobra.Command, _ []string) error { server := grpc.NewServer() // Register health service - grpc_health_v1.RegisterHealthServer(server, hs) + grpc_health_v1.RegisterHealthServer(server, liqoIPAM.HealthServer) // Register IPAM service ipam.RegisterIPAMServer(server, liqoIPAM) diff --git a/pkg/consts/ipam.go b/pkg/consts/ipam.go index 871ddb1bc5..35b45ff2f9 100644 --- a/pkg/consts/ipam.go +++ b/pkg/consts/ipam.go @@ -14,12 +14,16 @@ package consts +import "time" + // NetworkType indicates the type of Network. type NetworkType string const ( // IpamPort is the port used by the IPAM gRPC server. - IpamPort = 50051 + IpamPort = 6000 + // SyncFrequency is the frequency at which the IPAM should periodically sync its status. + SyncFrequency = 2 * time.Minute // NetworkNotRemappedLabelKey is the label key used to mark a Network that does not need CIDR remapping. NetworkNotRemappedLabelKey = "ipam.liqo.io/network-not-remapped" diff --git a/pkg/ipam/initialize.go b/pkg/ipam/initialize.go index 744e1262f9..c6ca720d98 100644 --- a/pkg/ipam/initialize.go +++ b/pkg/ipam/initialize.go @@ -26,11 +26,6 @@ import ( // +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch // +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch -type ipCidr struct { - ip string - cidr string -} - func (lipam *LiqoIPAM) initialize(ctx context.Context) error { if err := lipam.initializeNetworks(ctx); err != nil { return err @@ -81,7 +76,7 @@ func (lipam *LiqoIPAM) initializeIPs(ctx context.Context) error { func (lipam *LiqoIPAM) getReservedNetworks(ctx context.Context) ([]string, error) { var nets []string var networks ipamv1alpha1.NetworkList - if err := lipam.Options.Client.List(ctx, &networks); err != nil { + if err := lipam.Client.List(ctx, &networks); err != nil { return nil, err } @@ -109,7 +104,7 @@ func (lipam *LiqoIPAM) getReservedNetworks(ctx context.Context) ([]string, error func (lipam *LiqoIPAM) getReservedIPs(ctx context.Context) ([]ipCidr, error) { var ips []ipCidr var ipList ipamv1alpha1.IPList - if err := lipam.Options.Client.List(ctx, &ipList); err != nil { + if err := lipam.Client.List(ctx, &ipList); err != nil { return nil, err } @@ -133,15 +128,3 @@ func (lipam *LiqoIPAM) getReservedIPs(ctx context.Context) ([]ipCidr, error) { return ips, nil } - -func (lipam *LiqoIPAM) reserveNetwork(cidr string) error { - // TODO: Reserve the network. - klog.Infof("Reserved network %s", cidr) - return nil -} - -func (lipam *LiqoIPAM) reserveIP(ip, cidr string) error { - // TODO: Reserve the IP. - klog.Infof("Reserved IP %s in network %s", ip, cidr) - return nil -} diff --git a/pkg/ipam/ipam.go b/pkg/ipam/ipam.go index d91aaf700d..970c414536 100644 --- a/pkg/ipam/ipam.go +++ b/pkg/ipam/ipam.go @@ -16,6 +16,7 @@ package ipam import ( "context" + "sync" "time" "google.golang.org/grpc/health" @@ -27,15 +28,21 @@ import ( // LiqoIPAM is the struct implementing the IPAM interface. type LiqoIPAM struct { UnimplementedIPAMServer + HealthServer *health.Server + + client.Client + *rest.Config - Options *Options + options *Options + cacheNetworks map[string]networkInfo + cacheIPs map[string]ipInfo + mutex sync.Mutex } // Options contains the options to configure the IPAM. type Options struct { - Port int - Config *rest.Config - Client client.Client + Port int + SyncFrequency time.Duration EnableLeaderElection bool LeaderElectionNamespace string @@ -44,22 +51,33 @@ type Options struct { RenewDeadline time.Duration RetryPeriod time.Duration PodName string - - HealthServer *health.Server } // New creates a new instance of the LiqoIPAM. -func New(ctx context.Context, opts *Options) (*LiqoIPAM, error) { - opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING) +func New(ctx context.Context, cl client.Client, cfg *rest.Config, opts *Options) (*LiqoIPAM, error) { + hs := health.NewServer() + hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING) lipam := &LiqoIPAM{ - Options: opts, + HealthServer: hs, + + Config: cfg, + Client: cl, + + options: opts, + cacheNetworks: make(map[string]networkInfo), + cacheIPs: make(map[string]ipInfo), } + // Initialize the IPAM instance if err := lipam.initialize(ctx); err != nil { return nil, err } - opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING) + // Launch sync routine + go lipam.sync(ctx, opts.SyncFrequency) + + hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING) + return lipam, nil } diff --git a/pkg/ipam/ips.go b/pkg/ipam/ips.go new file mode 100644 index 0000000000..d852edd3c3 --- /dev/null +++ b/pkg/ipam/ips.go @@ -0,0 +1,60 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "time" + + klog "k8s.io/klog/v2" +) + +type ipInfo struct { + ipCidr + creationTimestamp time.Time +} + +func (i *ipInfo) String() string { + return i.ipCidr.String() +} + +type ipCidr struct { + ip string + cidr string +} + +func (i *ipCidr) String() string { + return i.ip + "-" + i.cidr +} + +func (lipam *LiqoIPAM) reserveIP(ip, cidr string) error { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + if lipam.cacheIPs == nil { + lipam.cacheIPs = make(map[string]ipInfo) + } + + // TODO: add correct logic. + ipI := ipInfo{ + ipCidr: ipCidr{ip: ip, cidr: cidr}, + creationTimestamp: time.Now(), + } + + // Save IP in cache. + lipam.cacheIPs[ipI.ipCidr.String()] = ipI + klog.Infof("Reserved IP %s in network %s", ip, cidr) + + return nil +} diff --git a/pkg/ipam/networks.go b/pkg/ipam/networks.go new file mode 100644 index 0000000000..dd78e8aea1 --- /dev/null +++ b/pkg/ipam/networks.go @@ -0,0 +1,51 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "time" + + klog "k8s.io/klog/v2" +) + +type networkInfo struct { + cidr string + creationTimestamp time.Time +} + +func (c *networkInfo) String() string { + return c.cidr +} + +func (lipam *LiqoIPAM) reserveNetwork(cidr string) error { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + if lipam.cacheNetworks == nil { + lipam.cacheNetworks = make(map[string]networkInfo) + } + + // TODO: add correct logic + nwI := networkInfo{ + cidr: cidr, + creationTimestamp: time.Now(), + } + + // Save network in cache. + lipam.cacheNetworks[nwI.String()] = nwI + klog.Infof("Reserved network %s", cidr) + + return nil +} diff --git a/pkg/ipam/sync.go b/pkg/ipam/sync.go new file mode 100644 index 0000000000..dbd3f304b9 --- /dev/null +++ b/pkg/ipam/sync.go @@ -0,0 +1,116 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "context" + "os" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + klog "k8s.io/klog/v2" + + ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" +) + +func (lipam *LiqoIPAM) sync(ctx context.Context, syncFrequency time.Duration) { + err := wait.PollUntilContextCancel(ctx, syncFrequency, false, + func(ctx context.Context) (done bool, err error) { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + now := time.Now() + // networks created before this threshold will be removed from the cache if they are not present in the cluster. + expiredThreshold := now.Add(-syncFrequency) + + if err := lipam.syncNetworks(ctx, expiredThreshold); err != nil { + klog.Error(err) + return false, err + } + if err := lipam.syncIPs(ctx, expiredThreshold); err != nil { + klog.Error(err) + return false, err + } + + return false, nil + }) + if err != nil { + klog.Error(err) + os.Exit(1) + } +} + +func (lipam *LiqoIPAM) syncNetworks(ctx context.Context, expiredThreshold time.Time) error { + // Set of networks present in the cluster. + nwSet := make(map[string]struct{}) + + // List all networks present in the cluster and add them to the set. + var networks ipamv1alpha1.NetworkList + if err := lipam.Client.List(ctx, &networks); err != nil { + return err + } + for i := range networks.Items { + nw := &networks.Items[i] + // If network does not have status set, skip it. + if nw.Status.CIDR != "" { + nwInfo := networkInfo{ + cidr: nw.Status.CIDR.String(), + } + nwSet[nwInfo.String()] = struct{}{} + } + } + + // Remove networks that are not present in the cache and were added before the threshold. + for key := range lipam.cacheNetworks { + if _, ok := nwSet[key]; !ok && lipam.cacheNetworks[key].creationTimestamp.Before(expiredThreshold) { + delete(lipam.cacheNetworks, key) + klog.Infof("Removed network %s from cache", key) + } + } + + return nil +} + +func (lipam *LiqoIPAM) syncIPs(ctx context.Context, expiredThreshold time.Time) error { + // Set of IPs present in the cluster. + ipSet := make(map[string]struct{}) + + // List all IPs present in the cluster and add them to the set. + var ips ipamv1alpha1.IPList + if err := lipam.Client.List(ctx, &ips); err != nil { + return err + } + for i := range ips.Items { + ip := &ips.Items[i] + if ip.Status.IP != "" && ip.Status.CIDR != "" { + ipCidr := ipInfo{ + ipCidr: ipCidr{ + ip: ip.Status.IP.String(), + cidr: ip.Status.CIDR.String()}, + } + ipSet[ipCidr.String()] = struct{}{} + } + } + + // Remove IPs that are not present in the cache and were added before the threshold. + for key := range lipam.cacheIPs { + if _, ok := ipSet[key]; !ok && lipam.cacheIPs[key].creationTimestamp.Before(expiredThreshold) { + delete(lipam.cacheIPs, key) + klog.Infof("Removed IP %s from cache", key) + } + } + + return nil +}