Skip to content

Commit

Permalink
Support for in-cluster lb
Browse files Browse the repository at this point in the history
  • Loading branch information
TrekkieCoder committed Jul 20, 2023
1 parent c08cca0 commit 2ac21c2
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cmd/loxilb-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func run(o *Options) error {

loxilbClients := make([]*api.LoxiClient, 0)
loxilbPeerClients := make([]*api.LoxiClient, 0)
loxiLBLiveCh := make(chan *api.LoxiClient)
loxiLBLiveCh := make(chan *api.LoxiClient, 2)
loxiLBSelMasterEvent := make(chan bool)

if len(networkConfig.LoxilbURLs) > 0 {
Expand Down
4 changes: 3 additions & 1 deletion manifest/kube-loxilb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ spec:
- --externalCIDR=123.123.123.1/24
#- --externalSecondaryCIDRs=124.124.124.1/24,125.125.125.1/24
#- --monitor
#- --setBGP
#- --setBGP=64512
#- --extBGPPeers=50.50.50.1:65101,51.51.51.1:65102
#- --setRoles
#- --setLBMode=1
#- --config=/opt/loxilb/agent/kube-loxilb.conf
resources:
Expand Down
61 changes: 40 additions & 21 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type SvcPair struct {
IPString string
Port int32
Protocol string
InRange bool
}

// GenKey generate key for cache
Expand Down Expand Up @@ -574,8 +575,10 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
}
klog.Infof("deallocateOnFailure defer function called")
for _, sp := range ingSvcPairs {
klog.Infof("ip %s is newIP so retrieve pool", sp.IPString)
ipPool.ReturnIPAddr(sp.IPString, uint32(sp.Port), sp.Protocol)
if sp.InRange {
klog.Infof("Returning ip %s to free pool", sp.IPString)
ipPool.ReturnIPAddr(sp.IPString, uint32(sp.Port), sp.Protocol)
}
for idx, ingSecIP := range m.lbCache[cacheKey].SecIPs {
if idx < len(sipPools) {
sipPools[idx].ReturnIPAddr(ingSecIP, uint32(sp.Port), sp.Protocol)
Expand Down Expand Up @@ -641,9 +644,11 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
return fmt.Errorf("failed to add loxiLB loadBalancer")
}
m.lbCache[cacheKey].LbModelList = append(m.lbCache[cacheKey].LbModelList, lbModelList...)
retIngress := corev1.LoadBalancerIngress{IP: ingSvcPair.IPString}
retIngress.Ports = append(retIngress.Ports, corev1.PortStatus{Port: ingSvcPair.Port, Protocol: corev1.Protocol(strings.ToUpper(ingSvcPair.Protocol))})
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, retIngress)
if ingSvcPair.InRange {
retIngress := corev1.LoadBalancerIngress{IP: ingSvcPair.IPString}
retIngress.Ports = append(retIngress.Ports, corev1.PortStatus{Port: ingSvcPair.Port, Protocol: corev1.Protocol(strings.ToUpper(ingSvcPair.Protocol))})
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, retIngress)
}
klog.Infof("added load-balancer")
}

Expand Down Expand Up @@ -844,7 +849,14 @@ func (m *Manager) getLBIngressSvcPairs(service *corev1.Service) []SvcPair {
var spairs []SvcPair
for _, ingress := range service.Status.LoadBalancer.Ingress {
for _, port := range service.Spec.Ports {
sp := SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol))}
sp := SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol)), false}
spairs = append(spairs, sp)
}
}

for _, extIP := range service.Spec.ExternalIPs {
for _, port := range service.Spec.Ports {
sp := SvcPair{extIP, port.Port, strings.ToLower(string(port.Protocol)), false}
spairs = append(spairs, sp)
}
}
Expand All @@ -858,31 +870,29 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) (
var sPairs []SvcPair
inSPairs := m.getLBIngressSvcPairs(service)
isHasLoxiExternalIP := false
inRange := false

ipPool := m.ExternalIPPool
if addrType == "ipv6" || addrType == "ipv6to4" {
ipPool = m.ExternalIP6Pool
}
//klog.Infof("inSpairs: %v", inSPairs)

// k8s service has ingress IP already
if len(inSPairs) >= 1 {
for _, inSPair := range inSPairs {
ident := inSPair.Port
proto := inSPair.Protocol

inRange, _ := ipPool.CheckAndReserveIP(inSPair.IPString, uint32(ident), proto)
if inRange {
sp := SvcPair{inSPair.IPString, ident, inSPair.Protocol}
sPairs = append(sPairs, sp)
isHasLoxiExternalIP = true
}
inRange, _ = ipPool.CheckAndReserveIP(inSPair.IPString, uint32(ident), proto)
sp := SvcPair{inSPair.IPString, ident, inSPair.Protocol, inRange}
sPairs = append(sPairs, sp)
isHasLoxiExternalIP = true
}
}

// If isHasLoxiExternalIP is false, that means:
// 1. k8s service has no ingress IP
// 2. k8s service has ingress IPs, but that is outside the range of kube-loxilb's externalIP.
// so that service need to be allowed new external IP
if !isHasLoxiExternalIP {
for _, port := range service.Spec.Ports {
proto := strings.ToLower(string(port.Protocol))
Expand All @@ -898,11 +908,11 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) (
klog.Errorf("failed to generate external IP. IP Pool is full")
return nil, errors.New("failed to generate external IP. IP Pool is full")
}
sp := SvcPair{newIP.String(), portNum, proto}
sp := SvcPair{newIP.String(), portNum, proto, true}
sPairs = append(sPairs, sp)
}
}

//klog.Infof("Spairs: %v", sPairs)
return sPairs, nil
}

Expand Down Expand Up @@ -940,7 +950,7 @@ func (m *Manager) getIngressSecSvcPairs(service *corev1.Service, numSecondary in
klog.Errorf("failed to generate external secondary IP. IP Pool is full")
return nil, errors.New("failed to generate external secondary IP. IP Pool is full")
}
sp := SvcPair{newIP.String(), portNum, proto}
sp := SvcPair{newIP.String(), portNum, proto, true}
sPairs = append(sPairs, sp)
}
}
Expand Down Expand Up @@ -1085,7 +1095,7 @@ loop:
}
}
case aliveClient := <-loxiAliveCh:
loop1:
aliveClient.DoBGPCfg = false
if m.networkConfig.SetRoles && m.ElectionRunOnce {
cisModel, err := m.makeLoxiLBCIStatusModel("default", aliveClient)
if err == nil {
Expand Down Expand Up @@ -1121,7 +1131,10 @@ loop:
klog.Infof("set-bgp-global cfg - failed")
if strings.Contains(err.Error(), "connection refused") {
time.Sleep(2 * time.Second)
goto loop1
if !aliveClient.DoBGPCfg {
loxiAliveCh <- aliveClient
aliveClient.DoBGPCfg = true
}
}
}

Expand All @@ -1133,7 +1146,10 @@ loop:
klog.Infof("set-bgp-neigh(%s) cfg - failed", bgpPeer)
if strings.Contains(err.Error(), "connection refused") {
time.Sleep(2 * time.Second)
goto loop1
if !aliveClient.DoBGPCfg {
loxiAliveCh <- aliveClient
aliveClient.DoBGPCfg = true
}
}
}
}
Expand Down Expand Up @@ -1162,7 +1178,10 @@ loop:
time.Sleep(1 * time.Second)
if strings.Contains(err.Error(), "connection refused") {
time.Sleep(2 * time.Second)
goto loop1
if !aliveClient.DoBGPCfg {
loxiAliveCh <- aliveClient
aliveClient.DoBGPCfg = true
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type LoxiClient struct {
Host string
Port string
IsAlive bool
DoBGPCfg bool
Stop chan struct{}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/ippool/ippool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (i *IPPool) ReturnIPAddr(ip string, sIdent uint32, proto string) {
sIdent = 0
}

IP := net.ParseIP(ip)
if IP != nil || !i.NetCIDR.Contains(IP) {
return
}

klog.Infof("Release ServiceIP %s:%v", ip, sIdent)

i.IPAlloc.DeAllocateIP(tk.IPClusterDefault, i.CIDR, sIdent, ip, proto)
Expand Down

0 comments on commit 2ac21c2

Please sign in to comment.