Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IPAM] Sync routine #2798

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions cmd/ipam/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -46,7 +45,8 @@ import (
const leaderElectorName = "liqo-ipam-leaderelection"

var (
scheme = runtime.NewScheme()
scheme = runtime.NewScheme()
options ipam.Options
)

func init() {
Expand All @@ -59,8 +59,6 @@ func init() {
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch

var options ipam.Options

func main() {
var cmd = cobra.Command{
Use: "liqo-ipam",
Expand All @@ -70,7 +68,12 @@ func main() {
flagsutils.InitKlogFlags(cmd.Flags())
restcfg.InitFlags(cmd.Flags())

cmd.Flags().IntVar(&options.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.")
// Server options.
cmd.Flags().IntVar(&options.ServerOpts.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.")
cmd.Flags().DurationVar(&options.ServerOpts.SyncFrequency, "interval", consts.SyncFrequency,
"The interval at which the IPAM will synchronize the IPAM storage.")

// Leader election flags.
cmd.Flags().BoolVar(&options.EnableLeaderElection, "leader-election", false, "Enable leader election for IPAM. "+
"Enabling this will ensure there is only one active IPAM.")
cmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", consts.DefaultLiqoNamespace,
Expand Down Expand Up @@ -102,14 +105,14 @@ func run(cmd *cobra.Command, _ []string) error {

// Get the rest config.
cfg := restcfg.SetRateLimiter(ctrl.GetConfigOrDie())
options.Config = cfg
fra98 marked this conversation as resolved.
Show resolved Hide resolved

// Get the client.
cl, err := client.New(cfg, client.Options{
Scheme: scheme,
})
if err != nil {
return err
}
options.Client = cl

if options.EnableLeaderElection {
if leader, err := leaderelection.Blocking(ctx, cfg, record.NewBroadcaster(), &leaderelection.Opts{
Expand All @@ -129,23 +132,20 @@ func run(cmd *cobra.Command, _ []string) error {
}
}

hs := health.NewServer()
options.HealthServer = hs

liqoIPAM, err := ipam.New(ctx, &options)
liqoIPAM, err := ipam.New(ctx, cl, &options.ServerOpts)
if err != nil {
return err
}

lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", options.Port))
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", options.ServerOpts.Port))
if err != nil {
return err
}

server := grpc.NewServer()

// Register health service
grpc_health_v1.RegisterHealthServer(server, hs)
grpc_health_v1.RegisterHealthServer(server, liqoIPAM.HealthServer)

// Register IPAM service
ipam.RegisterIPAMServer(server, liqoIPAM)
Expand Down
6 changes: 5 additions & 1 deletion pkg/consts/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@

package consts

import "time"

// NetworkType indicates the type of Network.
type NetworkType string

const (
// IpamPort is the port used by the IPAM gRPC server.
IpamPort = 50051
IpamPort = 6000
// SyncFrequency is the frequency at which the IPAM should periodically sync its status.
SyncFrequency = 2 * time.Minute

// NetworkNotRemappedLabelKey is the label key used to mark a Network that does not need CIDR remapping.
NetworkNotRemappedLabelKey = "ipam.liqo.io/network-not-remapped"
Expand Down
92 changes: 9 additions & 83 deletions pkg/ipam/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@ import (
"context"

klog "k8s.io/klog/v2"

ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
)

// +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch
// +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch

type ipCidr struct {
ip string
cidr string
}

func (lipam *LiqoIPAM) initialize(ctx context.Context) error {
if err := lipam.initializeNetworks(ctx); err != nil {
return err
Expand All @@ -45,15 +37,16 @@ func (lipam *LiqoIPAM) initialize(ctx context.Context) error {
}

func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error {
// Initialize the networks.
nets, err := lipam.getReservedNetworks(ctx)
// List all networks present in the cluster.
nets, err := listNetworksOnCluster(ctx, lipam.Client)
if err != nil {
return err
}

// Initialize the networks.
for _, net := range nets {
if err := lipam.reserveNetwork(net); err != nil {
klog.Errorf("Failed to reserve network %s: %v", net, err)
klog.Errorf("Failed to reserve network %q: %v", net, err)
return err
}
}
Expand All @@ -62,86 +55,19 @@ func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error {
}

func (lipam *LiqoIPAM) initializeIPs(ctx context.Context) error {
// Initialize the IPs.
ips, err := lipam.getReservedIPs(ctx)
// List all IPs present in the cluster.
ips, err := listIPsOnCluster(ctx, lipam.Client)
if err != nil {
return err
}

// Initialize the IPs.
for _, ip := range ips {
if err := lipam.reserveIP(ip.ip, ip.cidr); err != nil {
klog.Errorf("Failed to reserve IP %s in network %s: %v", ip.ip, ip.cidr, err)
if err := lipam.reserveIP(ip); err != nil {
klog.Errorf("Failed to reserve IP %q (network %q): %v", ip.ip, ip.cidr, err)
return err
}
}

return nil
}

func (lipam *LiqoIPAM) getReservedNetworks(ctx context.Context) ([]string, error) {
var nets []string
var networks ipamv1alpha1.NetworkList
if err := lipam.Options.Client.List(ctx, &networks); err != nil {
return nil, err
}

for i := range networks.Items {
net := &networks.Items[i]

var cidr string
switch {
case net.Labels != nil && net.Labels[consts.NetworkNotRemappedLabelKey] == consts.NetworkNotRemappedLabelValue:
cidr = net.Spec.CIDR.String()
default:
cidr = net.Status.CIDR.String()
}
if cidr == "" {
klog.Warningf("Network %s has no CIDR", net.Name)
continue
}

nets = append(nets, cidr)
}

return nets, nil
}

func (lipam *LiqoIPAM) getReservedIPs(ctx context.Context) ([]ipCidr, error) {
var ips []ipCidr
var ipList ipamv1alpha1.IPList
if err := lipam.Options.Client.List(ctx, &ipList); err != nil {
return nil, err
}

for i := range ipList.Items {
ip := &ipList.Items[i]

address := ip.Status.IP.String()
if address == "" {
klog.Warningf("IP %s has no address", ip.Name)
continue
}

cidr := ip.Status.CIDR.String()
if cidr == "" {
klog.Warningf("IP %s has no CIDR", ip.Name)
continue
}

ips = append(ips, ipCidr{ip: address, cidr: cidr})
}

return ips, nil
}

func (lipam *LiqoIPAM) reserveNetwork(cidr string) error {
// TODO: Reserve the network.
klog.Infof("Reserved network %s", cidr)
return nil
}

func (lipam *LiqoIPAM) reserveIP(ip, cidr string) error {
// TODO: Reserve the IP.
klog.Infof("Reserved IP %s in network %s", ip, cidr)
return nil
}
49 changes: 28 additions & 21 deletions pkg/ipam/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,58 @@ package ipam

import (
"context"
"sync"
"time"

"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// LiqoIPAM is the struct implementing the IPAM interface.
type LiqoIPAM struct {
UnimplementedIPAMServer
HealthServer *health.Server

Options *Options
}

// Options contains the options to configure the IPAM.
type Options struct {
Port int
Config *rest.Config
Client client.Client
client.Client

EnableLeaderElection bool
LeaderElectionNamespace string
LeaderElectionName string
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
PodName string
opts *ServerOptions
cacheNetworks map[string]networkInfo
cacheIPs map[string]ipInfo
mutex sync.Mutex
}

HealthServer *health.Server
// ServerOptions contains the options to configure the IPAM server.
type ServerOptions struct {
Port int
SyncFrequency time.Duration
}

// New creates a new instance of the LiqoIPAM.
func New(ctx context.Context, opts *Options) (*LiqoIPAM, error) {
opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
func New(ctx context.Context, cl client.Client, opts *ServerOptions) (*LiqoIPAM, error) {
hs := health.NewServer()
hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)

lipam := &LiqoIPAM{
Options: opts,
HealthServer: hs,

Client: cl,

opts: opts,
cacheNetworks: make(map[string]networkInfo),
cacheIPs: make(map[string]ipInfo),
}

// Initialize the IPAM instance
if err := lipam.initialize(ctx); err != nil {
return nil, err
}

opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
// Launch sync routine
go lipam.sync(ctx, opts.SyncFrequency)

hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)

return lipam, nil
}

Expand Down
Loading