Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a bug that the lb service could not be recreated if deleted. (and add verbose log level option) #20

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/loxilb-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func newAgentCommand() *cobra.Command {
Long: "The loxilb agent runs on each node.",
Run: func(cmd *cobra.Command, args []string) {
log.InitLogFileLimits(cmd.Flags())
log.InitLogLevel(cmd.Flags())
if err := opts.complete(args); err != nil {
klog.Errorf("Failed to options complete. err: %v", err)
os.Exit(255)
Expand Down
123 changes: 80 additions & 43 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"context"
"errors"
"fmt"
"net"
"path"
"reflect"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -32,12 +39,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"net"
"path"
"reflect"
"strconv"
"strings"
"time"

"github.com/loxilb-io/kube-loxilb/pkg/agent/config"
"github.com/loxilb-io/kube-loxilb/pkg/api"
Expand Down Expand Up @@ -121,10 +122,15 @@ type LbCacheKey struct {
}

type SvcPair struct {
IPString string
Port int32
Protocol string
InRange bool
IPString string
Port int32
Protocol string
InRange bool
K8sSvcPort corev1.ServicePort
}

func (s SvcPair) GenKey() string {
return fmt.Sprintf("%s:%d:%s:%v", s.IPString, s.Port, s.Protocol, s.InRange)
}

// GenKey generate key for cache
Expand Down Expand Up @@ -291,17 +297,20 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
probeResp := ""

if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 {
klog.V(4).Infof("kube-loxilb don't manage '%s' LoadBalancerClass.", *lbClassName)
return nil
}

// Check for loxilb specific annotations - Secondary IPs
klog.V(4).Info("check annotations...")
if na := svc.Annotations[numSecIPAnnotation]; na != "" {
num, err := strconv.Atoi(na)
if err != nil {
numSecondarySvc = 0
} else {
numSecondarySvc = num
}
klog.V(4).Info("service %s have numSecIPAnnotation annotation: %s", svc.Name, na)
}

// Check for loxilb specific annotations - Timeout
Expand All @@ -310,6 +319,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
if err == nil {
timeout = num
}
klog.V(4).Info("service %s have lbTimeoutAnnotation annotation: %s", svc.Name, to)
}

// Check for loxilb specific annotations - NAT LB Mode
Expand All @@ -323,13 +333,18 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
} else {
lbMode = -1
}
klog.V(4).Info("service %s have lbModeAnnotation annotation: %s", svc.Name, lbm)
}

klog.V(4).Info("service %s is created %s mode.", svc.Name, lbMode)

// Check for loxilb specific annotations - Liveness Check
if lchk := svc.Annotations[livenessAnnotation]; lchk != "" {
if lchk == "yes" {
livenessCheck = true
}

klog.V(4).Info("service %s have livenessAnnotation annotation: %s", svc.Name, lchk)
}

// Check for loxilb specific annotations - Liveness Probe Type
Expand Down Expand Up @@ -385,6 +400,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
} else if lba == "nat64" {
addrType = "ipv6to4"
}

klog.V(4).Info("service %s have lbAddressAnnotation annotation: %s", svc.Name, lba)
}

if addrType != "ipv4" && numSecondarySvc != 0 {
Expand All @@ -399,6 +416,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
return err
}

klog.V(4).Infof("get endpointIPs: %v", endpointIPs)

cacheKey := GenKey(svc.Namespace, svc.Name)
_, added := m.lbCache[cacheKey]
if !added {
Expand All @@ -419,6 +438,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
Addr: addrType,
SecIPs: []string{},
}

klog.V(4).Info("new %s LB entry is added to cache", svc.Name)
}

oldsvc := svc.DeepCopy()
Expand Down Expand Up @@ -510,6 +531,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
if addrType == "ipv6" || addrType == "ipv6to4" {
sipPools = m.ExtSecondaryIP6Pools
}

for idx, ingSecIP := range m.lbCache[cacheKey].SecIPs {
if idx < len(sipPools) {
for _, lb := range m.lbCache[cacheKey].LbModelList {
Expand Down Expand Up @@ -588,30 +610,29 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
}
}()

var lbModelList []api.LoadBalancerModel
for _, ingSvcPair := range ingSvcPairs {
var errChList []chan error
var lbModelList []api.LoadBalancerModel
for _, port := range svc.Spec.Ports {
lbArgs := LbArgs{
externalIP: ingSvcPair.IPString,
livenessCheck: m.lbCache[cacheKey].ActCheck,
lbMode: m.lbCache[cacheKey].LbMode,
timeout: m.lbCache[cacheKey].Timeout,
probeType: m.lbCache[cacheKey].ProbeType,
probePort: m.lbCache[cacheKey].ProbePort,
probeReq: m.lbCache[cacheKey].ProbeReq,
probeResp: m.lbCache[cacheKey].ProbeResp,
needPodEP: needPodEP,
}
lbArgs.secIPs = append(lbArgs.secIPs, m.lbCache[cacheKey].SecIPs...)
lbArgs.endpointIPs = append(lbArgs.endpointIPs, endpointIPs...)

lbModel, err := m.makeLoxiLoadBalancerModel(&lbArgs, svc, port)
if err != nil {
return err
}
lbModelList = append(lbModelList, lbModel)
lbArgs := LbArgs{
externalIP: ingSvcPair.IPString,
livenessCheck: m.lbCache[cacheKey].ActCheck,
lbMode: m.lbCache[cacheKey].LbMode,
timeout: m.lbCache[cacheKey].Timeout,
probeType: m.lbCache[cacheKey].ProbeType,
probePort: m.lbCache[cacheKey].ProbePort,
probeReq: m.lbCache[cacheKey].ProbeReq,
probeResp: m.lbCache[cacheKey].ProbeResp,
needPodEP: needPodEP,
}
lbArgs.secIPs = append(lbArgs.secIPs, m.lbCache[cacheKey].SecIPs...)
lbArgs.endpointIPs = append(lbArgs.endpointIPs, endpointIPs...)

lbModel, err := m.makeLoxiLoadBalancerModel(&lbArgs, svc, ingSvcPair.K8sSvcPort)
if err != nil {
return err
}
lbModelList = append(lbModelList, lbModel)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -643,7 +664,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
klog.Errorf("failed to add load-balancer")
return fmt.Errorf("failed to add loxiLB loadBalancer")
}
m.lbCache[cacheKey].LbModelList = append(m.lbCache[cacheKey].LbModelList, lbModelList...)

if ingSvcPair.InRange {
retIngress := corev1.LoadBalancerIngress{IP: ingSvcPair.IPString}
retIngress.Ports = append(retIngress.Ports, corev1.PortStatus{Port: ingSvcPair.Port, Protocol: corev1.Protocol(strings.ToUpper(ingSvcPair.Protocol))})
Expand All @@ -653,6 +674,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
}

// Update service.Status.LoadBalancer.Ingress
m.lbCache[cacheKey].LbModelList = append(m.lbCache[cacheKey].LbModelList, lbModelList...)
m.updateService(oldsvc, svc)

return nil
Expand Down Expand Up @@ -685,6 +707,7 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error {
sipPools = m.ExtSecondaryIP6Pools
}

klog.V(4).Infof("delete lb entry: len (%d)", len(lbEntry.LbModelList))
for _, lb := range lbEntry.LbModelList {
var errChList []chan error
for _, loxiClient := range m.LoxiClients {
Expand All @@ -700,6 +723,9 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error {
isError := true
for _, errCh := range errChList {
err := <-errCh
if err != nil {
klog.V(4).Infof("error: %v", err)
}
if err == nil {
isError = false
break
Expand Down Expand Up @@ -733,10 +759,10 @@ func (m *Manager) DeleteAllLoadBalancer() {
}

for _, lb := range lbEntry.LbModelList {
var errChList []chan error
//var errChList []chan error
for _, loxiClient := range m.LoxiClients {
ch := make(chan error)
errChList = append(errChList, ch)
//ch := make(chan error)
//errChList = append(errChList, ch)

klog.Infof("called loxilb API: delete lb rule %v", lb)
loxiClient.LoadBalancer().Delete(context.Background(), &lb)
Expand All @@ -751,8 +777,6 @@ func (m *Manager) DeleteAllLoadBalancer() {
}
}
m.lbCache = nil

return
}

// getEndpoints return LB's endpoints IP list.
Expand Down Expand Up @@ -846,21 +870,31 @@ func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string) []str
}

func (m *Manager) getLBIngressSvcPairs(service *corev1.Service) []SvcPair {
checkSet := ippool.NewSet()
var spairs []SvcPair
for _, ingress := range service.Status.LoadBalancer.Ingress {
for _, port := range service.Spec.Ports {
sp := SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol)), false}
spairs = append(spairs, sp)
sp := SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol)), false, port}
spKey := sp.GenKey()
if !checkSet.Contains(spKey) {
checkSet.Add(spKey)
spairs = append(spairs, sp)
}
}
}

for _, extIP := range service.Spec.ExternalIPs {
for _, port := range service.Spec.Ports {
sp := SvcPair{extIP, port.Port, strings.ToLower(string(port.Protocol)), false}
spairs = append(spairs, sp)
sp := SvcPair{extIP, port.Port, strings.ToLower(string(port.Protocol)), false, port}
spKey := sp.GenKey()
if !checkSet.Contains(spKey) {
checkSet.Add(spKey)
spairs = append(spairs, sp)
}
}
}

checkSet = nil
return spairs
}

Expand All @@ -880,15 +914,17 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) (

// k8s service has ingress IP already
if len(inSPairs) >= 1 {
klog.V(4).Infof("lb service %s is already have ingress list: %v", service.Name, inSPairs)
for _, inSPair := range inSPairs {
ident := inSPair.Port
proto := inSPair.Protocol

inRange, _ = ipPool.CheckAndReserveIP(inSPair.IPString, uint32(ident), proto)
sp := SvcPair{inSPair.IPString, ident, inSPair.Protocol, inRange}
sp := SvcPair{inSPair.IPString, ident, inSPair.Protocol, inRange, inSPair.K8sSvcPort}
sPairs = append(sPairs, sp)
isHasLoxiExternalIP = true
}
klog.V(4).Infof("search ingress info to lb cache.. find: %v", sPairs)
}

// If isHasLoxiExternalIP is false, that means:
Expand All @@ -900,6 +936,7 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) (
newIP := ipPool.GetNewIPAddr(uint32(portNum), proto)
if newIP == nil {
// This is a safety code in case the service has the same port.
klog.V(4).Infof("isHasLoxiExternalIP is true. and ingress info %v. lb cache info: %v", inSPairs, sPairs)
for _, s := range sPairs {
if s.Port == portNum && s.Protocol == proto {
continue
Expand All @@ -908,7 +945,7 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) (
klog.Errorf("failed to generate external IP. IP Pool is full")
return nil, errors.New("failed to generate external IP. IP Pool is full")
}
sp := SvcPair{newIP.String(), portNum, proto, true}
sp := SvcPair{newIP.String(), portNum, proto, true, port}
sPairs = append(sPairs, sp)
}
}
Expand Down Expand Up @@ -950,7 +987,7 @@ func (m *Manager) getIngressSecSvcPairs(service *corev1.Service, numSecondary in
klog.Errorf("failed to generate external secondary IP. IP Pool is full")
return nil, errors.New("failed to generate external secondary IP. IP Pool is full")
}
sp := SvcPair{newIP.String(), portNum, proto, true}
sp := SvcPair{newIP.String(), portNum, proto, true, port}
sPairs = append(sPairs, sp)
}
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"fmt"
"net/http"
)

type EpSelect uint
Expand Down Expand Up @@ -128,8 +129,10 @@ func (l *LoadBalancerAPI) Delete(ctx context.Context, lbModel LoxiModel) error {
}

resp := l.client.DELETE(l.resource).SubResource(subresources).Query(queryParam).Body(lbModel).Do(ctx)
if resp.err != nil {
return resp.err
if resp.statusCode != http.StatusOK {
if resp.err != nil {
return resp.err
}
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func (l *LoxiRequest) Do(ctx context.Context) *LoxiResponse {

if resp.StatusCode == http.StatusOK {
if err := json.Unmarshal(respByte, &result); err != nil {
return &LoxiResponse{err: err}
return &LoxiResponse{statusCode: resp.StatusCode, err: err}
}

if result.Result != "Success" && !strings.Contains(result.Result, "exist") {
return &LoxiResponse{err: errors.New(result.Result)}
return &LoxiResponse{statusCode: resp.StatusCode, err: errors.New(result.Result)}
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/log/logFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
logFileFlag = "log_file"
maxSizeFlag = "log_file_max_size"
maxNumFlag = "log_file_max_num"

verboseFlag = "v"
// Check log file number every 10 mins.
logFileCheckInterval = time.Minute * 10
// Allowed maximum value for the maximum file size limit.
Expand All @@ -46,13 +46,15 @@ var (
maxNumArg = uint16(0)
logFileMaxNum = uint16(0)
logDir = ""
verbose = "0"

executableName = filepath.Base(os.Args[0])
)

func AddFlags(fs *pflag.FlagSet) {
fs.Uint16Var(&maxNumArg, maxNumFlag, maxNumArg, "Maximum number of log files per severity level to be kept. Value 0 means unlimited.")
fs.StringVar(&logDir, logDirFlag, logDir, "log file directory")
fs.StringVar(&verbose, verboseFlag, verbose, "verbose level")
}

// InitLogFileLimits initializes log file maximum size and maximum number limits based on the
Expand Down
Loading
Loading