diff --git a/cmd/loxilb-agent/main.go b/cmd/loxilb-agent/main.go index d59ba6f..7afe9c0 100644 --- a/cmd/loxilb-agent/main.go +++ b/cmd/loxilb-agent/main.go @@ -73,6 +73,7 @@ func newAgentCommand() *cobra.Command { Long: "The loxilb agent runs on each node.", Run: func(cmd *cobra.Command, args []string) { log.InitLogFileLimits(cmd.Flags()) + log.InitLogLevel(cmd.Flags()) if err := opts.complete(args); err != nil { klog.Errorf("Failed to options complete. err: %v", err) os.Exit(255) diff --git a/pkg/agent/manager/loadbalancer/loadbalancer.go b/pkg/agent/manager/loadbalancer/loadbalancer.go index b885715..485b01c 100644 --- a/pkg/agent/manager/loadbalancer/loadbalancer.go +++ b/pkg/agent/manager/loadbalancer/loadbalancer.go @@ -20,6 +20,13 @@ import ( "context" "errors" "fmt" + "net" + "path" + "reflect" + "strconv" + "strings" + "time" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -32,12 +39,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "net" - "path" - "reflect" - "strconv" - "strings" - "time" "github.com/loxilb-io/kube-loxilb/pkg/agent/config" "github.com/loxilb-io/kube-loxilb/pkg/api" @@ -121,10 +122,15 @@ type LbCacheKey struct { } type SvcPair struct { - IPString string - Port int32 - Protocol string - InRange bool + IPString string + Port int32 + Protocol string + InRange bool + K8sSvcPort corev1.ServicePort +} + +func (s SvcPair) GenKey() string { + return fmt.Sprintf("%s:%d:%s:%v", s.IPString, s.Port, s.Protocol, s.InRange) } // GenKey generate key for cache @@ -291,10 +297,12 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { probeResp := "" if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 { + klog.V(4).Infof("kube-loxilb don't manage '%s' LoadBalancerClass.", *lbClassName) return nil } // Check for loxilb specific annotations - Secondary IPs + klog.V(4).Info("check annotations...") if na := svc.Annotations[numSecIPAnnotation]; na != "" { num, err := strconv.Atoi(na) if err != nil { @@ -302,6 +310,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } else { numSecondarySvc = num } + klog.V(4).Info("service %s have numSecIPAnnotation annotation: %s", svc.Name, na) } // Check for loxilb specific annotations - Timeout @@ -310,6 +319,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { if err == nil { timeout = num } + klog.V(4).Info("service %s have lbTimeoutAnnotation annotation: %s", svc.Name, to) } // Check for loxilb specific annotations - NAT LB Mode @@ -323,13 +333,18 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } else { lbMode = -1 } + klog.V(4).Info("service %s have lbModeAnnotation annotation: %s", svc.Name, lbm) } + klog.V(4).Info("service %s is created %s mode.", svc.Name, lbMode) + // Check for loxilb specific annotations - Liveness Check if lchk := svc.Annotations[livenessAnnotation]; lchk != "" { if lchk == "yes" { livenessCheck = true } + + klog.V(4).Info("service %s have livenessAnnotation annotation: %s", svc.Name, lchk) } // Check for loxilb specific annotations - Liveness Probe Type @@ -385,6 +400,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } else if lba == "nat64" { addrType = "ipv6to4" } + + klog.V(4).Info("service %s have lbAddressAnnotation annotation: %s", svc.Name, lba) } if addrType != "ipv4" && numSecondarySvc != 0 { @@ -399,6 +416,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { return err } + klog.V(4).Infof("get endpointIPs: %v", endpointIPs) + cacheKey := GenKey(svc.Namespace, svc.Name) _, added := m.lbCache[cacheKey] if !added { @@ -419,6 +438,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { Addr: addrType, SecIPs: []string{}, } + + klog.V(4).Info("new %s LB entry is added to cache", svc.Name) } oldsvc := svc.DeepCopy() @@ -510,6 +531,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { if addrType == "ipv6" || addrType == "ipv6to4" { sipPools = m.ExtSecondaryIP6Pools } + for idx, ingSecIP := range m.lbCache[cacheKey].SecIPs { if idx < len(sipPools) { for _, lb := range m.lbCache[cacheKey].LbModelList { @@ -588,30 +610,29 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } }() + var lbModelList []api.LoadBalancerModel for _, ingSvcPair := range ingSvcPairs { var errChList []chan error - var lbModelList []api.LoadBalancerModel - for _, port := range svc.Spec.Ports { - lbArgs := LbArgs{ - externalIP: ingSvcPair.IPString, - livenessCheck: m.lbCache[cacheKey].ActCheck, - lbMode: m.lbCache[cacheKey].LbMode, - timeout: m.lbCache[cacheKey].Timeout, - probeType: m.lbCache[cacheKey].ProbeType, - probePort: m.lbCache[cacheKey].ProbePort, - probeReq: m.lbCache[cacheKey].ProbeReq, - probeResp: m.lbCache[cacheKey].ProbeResp, - needPodEP: needPodEP, - } - lbArgs.secIPs = append(lbArgs.secIPs, m.lbCache[cacheKey].SecIPs...) - lbArgs.endpointIPs = append(lbArgs.endpointIPs, endpointIPs...) - lbModel, err := m.makeLoxiLoadBalancerModel(&lbArgs, svc, port) - if err != nil { - return err - } - lbModelList = append(lbModelList, lbModel) + lbArgs := LbArgs{ + externalIP: ingSvcPair.IPString, + livenessCheck: m.lbCache[cacheKey].ActCheck, + lbMode: m.lbCache[cacheKey].LbMode, + timeout: m.lbCache[cacheKey].Timeout, + probeType: m.lbCache[cacheKey].ProbeType, + probePort: m.lbCache[cacheKey].ProbePort, + probeReq: m.lbCache[cacheKey].ProbeReq, + probeResp: m.lbCache[cacheKey].ProbeResp, + needPodEP: needPodEP, } + lbArgs.secIPs = append(lbArgs.secIPs, m.lbCache[cacheKey].SecIPs...) + lbArgs.endpointIPs = append(lbArgs.endpointIPs, endpointIPs...) + + lbModel, err := m.makeLoxiLoadBalancerModel(&lbArgs, svc, ingSvcPair.K8sSvcPort) + if err != nil { + return err + } + lbModelList = append(lbModelList, lbModel) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -643,7 +664,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { klog.Errorf("failed to add load-balancer") return fmt.Errorf("failed to add loxiLB loadBalancer") } - m.lbCache[cacheKey].LbModelList = append(m.lbCache[cacheKey].LbModelList, lbModelList...) + if ingSvcPair.InRange { retIngress := corev1.LoadBalancerIngress{IP: ingSvcPair.IPString} retIngress.Ports = append(retIngress.Ports, corev1.PortStatus{Port: ingSvcPair.Port, Protocol: corev1.Protocol(strings.ToUpper(ingSvcPair.Protocol))}) @@ -653,6 +674,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } // Update service.Status.LoadBalancer.Ingress + m.lbCache[cacheKey].LbModelList = append(m.lbCache[cacheKey].LbModelList, lbModelList...) m.updateService(oldsvc, svc) return nil @@ -685,6 +707,7 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error { sipPools = m.ExtSecondaryIP6Pools } + klog.V(4).Infof("delete lb entry: len (%d)", len(lbEntry.LbModelList)) for _, lb := range lbEntry.LbModelList { var errChList []chan error for _, loxiClient := range m.LoxiClients { @@ -700,6 +723,9 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error { isError := true for _, errCh := range errChList { err := <-errCh + if err != nil { + klog.V(4).Infof("error: %v", err) + } if err == nil { isError = false break @@ -733,10 +759,10 @@ func (m *Manager) DeleteAllLoadBalancer() { } for _, lb := range lbEntry.LbModelList { - var errChList []chan error + //var errChList []chan error for _, loxiClient := range m.LoxiClients { - ch := make(chan error) - errChList = append(errChList, ch) + //ch := make(chan error) + //errChList = append(errChList, ch) klog.Infof("called loxilb API: delete lb rule %v", lb) loxiClient.LoadBalancer().Delete(context.Background(), &lb) @@ -751,8 +777,6 @@ func (m *Manager) DeleteAllLoadBalancer() { } } m.lbCache = nil - - return } // getEndpoints return LB's endpoints IP list. @@ -846,21 +870,31 @@ func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string) []str } func (m *Manager) getLBIngressSvcPairs(service *corev1.Service) []SvcPair { + checkSet := ippool.NewSet() var spairs []SvcPair for _, ingress := range service.Status.LoadBalancer.Ingress { for _, port := range service.Spec.Ports { - sp := SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol)), false} - spairs = append(spairs, sp) + sp := SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol)), false, port} + spKey := sp.GenKey() + if !checkSet.Contains(spKey) { + checkSet.Add(spKey) + spairs = append(spairs, sp) + } } } for _, extIP := range service.Spec.ExternalIPs { for _, port := range service.Spec.Ports { - sp := SvcPair{extIP, port.Port, strings.ToLower(string(port.Protocol)), false} - spairs = append(spairs, sp) + sp := SvcPair{extIP, port.Port, strings.ToLower(string(port.Protocol)), false, port} + spKey := sp.GenKey() + if !checkSet.Contains(spKey) { + checkSet.Add(spKey) + spairs = append(spairs, sp) + } } } + checkSet = nil return spairs } @@ -880,15 +914,17 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) ( // k8s service has ingress IP already if len(inSPairs) >= 1 { + klog.V(4).Infof("lb service %s is already have ingress list: %v", service.Name, inSPairs) for _, inSPair := range inSPairs { ident := inSPair.Port proto := inSPair.Protocol inRange, _ = ipPool.CheckAndReserveIP(inSPair.IPString, uint32(ident), proto) - sp := SvcPair{inSPair.IPString, ident, inSPair.Protocol, inRange} + sp := SvcPair{inSPair.IPString, ident, inSPair.Protocol, inRange, inSPair.K8sSvcPort} sPairs = append(sPairs, sp) isHasLoxiExternalIP = true } + klog.V(4).Infof("search ingress info to lb cache.. find: %v", sPairs) } // If isHasLoxiExternalIP is false, that means: @@ -900,6 +936,7 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) ( newIP := ipPool.GetNewIPAddr(uint32(portNum), proto) if newIP == nil { // This is a safety code in case the service has the same port. + klog.V(4).Infof("isHasLoxiExternalIP is true. and ingress info %v. lb cache info: %v", inSPairs, sPairs) for _, s := range sPairs { if s.Port == portNum && s.Protocol == proto { continue @@ -908,7 +945,7 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) ( klog.Errorf("failed to generate external IP. IP Pool is full") return nil, errors.New("failed to generate external IP. IP Pool is full") } - sp := SvcPair{newIP.String(), portNum, proto, true} + sp := SvcPair{newIP.String(), portNum, proto, true, port} sPairs = append(sPairs, sp) } } @@ -950,7 +987,7 @@ func (m *Manager) getIngressSecSvcPairs(service *corev1.Service, numSecondary in klog.Errorf("failed to generate external secondary IP. IP Pool is full") return nil, errors.New("failed to generate external secondary IP. IP Pool is full") } - sp := SvcPair{newIP.String(), portNum, proto, true} + sp := SvcPair{newIP.String(), portNum, proto, true, port} sPairs = append(sPairs, sp) } } diff --git a/pkg/api/lb.go b/pkg/api/lb.go index a20ab20..3600677 100644 --- a/pkg/api/lb.go +++ b/pkg/api/lb.go @@ -3,6 +3,7 @@ package api import ( "context" "fmt" + "net/http" ) type EpSelect uint @@ -128,8 +129,10 @@ func (l *LoadBalancerAPI) Delete(ctx context.Context, lbModel LoxiModel) error { } resp := l.client.DELETE(l.resource).SubResource(subresources).Query(queryParam).Body(lbModel).Do(ctx) - if resp.err != nil { - return resp.err + if resp.statusCode != http.StatusOK { + if resp.err != nil { + return resp.err + } } return nil diff --git a/pkg/api/request.go b/pkg/api/request.go index 873e701..dc00016 100644 --- a/pkg/api/request.go +++ b/pkg/api/request.go @@ -130,11 +130,11 @@ func (l *LoxiRequest) Do(ctx context.Context) *LoxiResponse { if resp.StatusCode == http.StatusOK { if err := json.Unmarshal(respByte, &result); err != nil { - return &LoxiResponse{err: err} + return &LoxiResponse{statusCode: resp.StatusCode, err: err} } if result.Result != "Success" && !strings.Contains(result.Result, "exist") { - return &LoxiResponse{err: errors.New(result.Result)} + return &LoxiResponse{statusCode: resp.StatusCode, err: errors.New(result.Result)} } } diff --git a/pkg/log/logFile.go b/pkg/log/logFile.go index a53fb90..0d2895f 100644 --- a/pkg/log/logFile.go +++ b/pkg/log/logFile.go @@ -35,7 +35,7 @@ const ( logFileFlag = "log_file" maxSizeFlag = "log_file_max_size" maxNumFlag = "log_file_max_num" - + verboseFlag = "v" // Check log file number every 10 mins. logFileCheckInterval = time.Minute * 10 // Allowed maximum value for the maximum file size limit. @@ -46,6 +46,7 @@ var ( maxNumArg = uint16(0) logFileMaxNum = uint16(0) logDir = "" + verbose = "0" executableName = filepath.Base(os.Args[0]) ) @@ -53,6 +54,7 @@ var ( func AddFlags(fs *pflag.FlagSet) { fs.Uint16Var(&maxNumArg, maxNumFlag, maxNumArg, "Maximum number of log files per severity level to be kept. Value 0 means unlimited.") fs.StringVar(&logDir, logDirFlag, logDir, "log file directory") + fs.StringVar(&verbose, verboseFlag, verbose, "verbose level") } // InitLogFileLimits initializes log file maximum size and maximum number limits based on the diff --git a/pkg/log/logLevel.go b/pkg/log/logLevel.go new file mode 100644 index 0000000..bfde147 --- /dev/null +++ b/pkg/log/logLevel.go @@ -0,0 +1,42 @@ +// Copyright 2023 Netlox Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "github.com/spf13/pflag" + + "k8s.io/klog/v2" +) + +const logVerbosityFlag = "v" + +// GetCurrentLogLevel returns the current log verbosity level. +func GetCurrentLogLevel(fs *pflag.FlagSet) string { + return fs.Lookup(logVerbosityFlag).Value.String() +} + +// InitLogLevel sets the log verbosity level when init time +func InitLogLevel(fs *pflag.FlagSet) error { + level := GetCurrentLogLevel(fs) + + var l klog.Level + err := l.Set(level) + if err != nil { + return err + } + + klog.Infof("Set log level to %s", level) + return nil +}