Skip to content

Commit

Permalink
Support for in-cluster lb
Browse files Browse the repository at this point in the history
  • Loading branch information
TrekkieCoder committed Jul 19, 2023
1 parent 576acdc commit e13e4d5
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 38 deletions.
110 changes: 94 additions & 16 deletions cmd/loxilb-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ package main

import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/loxilb-io/kube-loxilb/pkg/agent/config"
"github.com/loxilb-io/kube-loxilb/pkg/agent/manager/loadbalancer"
"github.com/loxilb-io/kube-loxilb/pkg/api"
"github.com/loxilb-io/kube-loxilb/pkg/ippool"
"github.com/loxilb-io/kube-loxilb/pkg/k8s"
"github.com/loxilb-io/kube-loxilb/pkg/log"
"net"
"os"
"os/signal"
"sort"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -141,20 +143,14 @@ func run(o *Options) error {
}
}

loxiAliveCh := make(chan *api.LoxiClient)
var loxilbClients []*api.LoxiClient
for _, lbURL := range networkConfig.LoxilbURLs {
loxilbClient, err := api.NewLoxiClient(lbURL)
if err != nil {
return err
}
loxilbClient.SetLoxiHealthCheckChan(stopCh, loxiAliveCh)
loxilbClients = append(loxilbClients, loxilbClient)
}
loxilbClients := make([]*api.LoxiClient, 0)
loxilbPeerClients := make([]*api.LoxiClient, 0)
loxiLBLiveCh := make(chan *api.LoxiClient)

lbManager := loadbalancer.NewLoadBalancerManager(
k8sClient,
loxilbClients,
loxilbPeerClients,
ipPool,
sipPools,
ipPool6,
Expand All @@ -163,10 +159,92 @@ func run(o *Options) error {
informerFactory,
)

if len(networkConfig.LoxilbURLs) > 0 {
for _, lbURL := range networkConfig.LoxilbURLs {
loxilbClient, err := api.NewLoxiClient(lbURL, loxiLBLiveCh, false)
if err != nil {
return err
}
loxilbClients = append(loxilbClients, loxilbClient)
}
} else {
go wait.Until(func() {

klog.Infof("DNS lookup:")

var tmploxilbClients []*api.LoxiClient
ips, err := net.LookupIP("loxilb-lb-service")
if err == nil {
for _, ip := range ips {
client, err2 := api.NewLoxiClient("http://"+ip.String()+":11111", loxiLBLiveCh, false)
if err2 != nil {
continue
}
tmploxilbClients = append(tmploxilbClients, client)
}
if len(tmploxilbClients) > 0 {
sort.Slice(tmploxilbClients, func(i, j int) bool {
return tmploxilbClients[i].Url < tmploxilbClients[j].Url
})
chg := false
if len(tmploxilbClients) != len(lbManager.LoxiClients) {
chg = true
} else {
for i, v := range lbManager.LoxiClients {
if v.Url != tmploxilbClients[i].Url {
chg = true
}
}
}
if chg == true {
for _, v := range lbManager.LoxiClients {
v.StopLoxiHealthCheckChan()
}
lbManager.LoxiClients = tmploxilbClients
}
}
}

var tmploxilbPeerClients []*api.LoxiClient
ips, err = net.LookupIP("loxilb-peer-service")
if err == nil {
for _, ip := range ips {
klog.Infof("loxilb-peer-service IN A %s\n", ip.String())
client, err2 := api.NewLoxiClient("http://"+ip.String()+":11111", loxiLBLiveCh, true)
if err2 != nil {
continue
}
tmploxilbPeerClients = append(tmploxilbPeerClients, client)
}
if len(tmploxilbPeerClients) > 0 {
sort.Slice(tmploxilbPeerClients, func(i, j int) bool {
return tmploxilbPeerClients[i].Url < tmploxilbPeerClients[j].Url
})
chg := false
if len(tmploxilbPeerClients) != len(lbManager.LoxiPeerClients) {
chg = true
} else {
for i, v := range lbManager.LoxiPeerClients {
if v.Url != tmploxilbPeerClients[i].Url {
chg = true
}
}
}
if chg == true {
for _, v := range lbManager.LoxiPeerClients {
v.StopLoxiHealthCheckChan()
}
lbManager.LoxiPeerClients = tmploxilbPeerClients
}
}
}
}, time.Second*20, stopCh)
}

log.StartLogFileNumberMonitor(stopCh)
informerFactory.Start(stopCh)

go lbManager.Run(stopCh, loxiAliveCh)
go lbManager.Run(stopCh, loxiLBLiveCh)

<-stopCh

Expand Down
2 changes: 1 addition & 1 deletion cmd/loxilb-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Execute() {
Long: "loxilb-k8s",
}

client, err := api.NewLoxiClient("http://127.0.0.1:11111")
client, err := api.NewLoxiClient("http://127.0.0.1:11111", nil, false)
if err != nil {
return
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/loxilb-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ func (o *Options) setDefaults() {
if o.config.HostProcPathPrefix == "" {
o.config.HostProcPathPrefix = defaultHostProcPathPrefix
}
if o.config.LoxiURLs == nil {
o.config.LoxiURLs = []string{defaultLoxiURL}
}
//if o.config.LoxiURLs == nil {
// o.config.LoxiURLs = []string{defaultLoxiURL}
//}
if o.config.NodePortServiceVirtIP == "" {
o.config.NodePortServiceVirtIP = defaultnodePortServiceVirtIP
}
Expand Down
1 change: 1 addition & 0 deletions manifest/kube-loxilb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ spec:
app: loxilb
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
tolerations:
- effect: NoSchedule
operator: Exists
Expand Down
57 changes: 57 additions & 0 deletions manifest/loxilb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: loxilb-lb
namespace: kube-system
spec:
selector:
matchLabels:
app: loxilb-app
template:
metadata:
name: loxilb-lb
labels:
app: loxilb-app
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
tolerations:
- key: "node-role.kubernetes.io/master"
operator: Exists
- key: "node-role.kubernetes.io/control-plane"
operator: Exists
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "node-role.kubernetes.io/master"
operator: Exists
- key: "node-role.kubernetes.io/control-plane"
operator: Exists
containers:
- name: loxilb-app
image: "ghcr.io/loxilb-io/loxilb:latest"
command: [ "/root/loxilb-io/loxilb/loxilb" ]
ports:
- containerPort: 11111
securityContext:
privileged: true
capabilities:
add:
- SYS_ADMIN
---
apiVersion: v1
kind: Service
metadata:
name: loxilb-lb-service
namespace: kube-system
spec:
clusterIP: None
selector:
app: loxilb-app
ports:
- name: loxilb-app
port: 11111
targetPort: 11111
protocol: TCP
19 changes: 11 additions & 8 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ const (

type Manager struct {
kubeClient clientset.Interface
loxiClients []*api.LoxiClient
LoxiClients []*api.LoxiClient
LoxiPeerClients []*api.LoxiClient
networkConfig *config.NetworkConfig
serviceInformer coreinformers.ServiceInformer
serviceLister corelisters.ServiceLister
Expand Down Expand Up @@ -135,6 +136,7 @@ func GenKey(ns, name string) string {
func NewLoadBalancerManager(
kubeClient clientset.Interface,
loxiClients []*api.LoxiClient,
loxiPeerClients []*api.LoxiClient,
externalIPPool *ippool.IPPool,
externalSecondaryIPPools []*ippool.IPPool,
externalIP6Pool *ippool.IPPool,
Expand All @@ -146,7 +148,8 @@ func NewLoadBalancerManager(
nodeInformer := informerFactory.Core().V1().Nodes()
manager := &Manager{
kubeClient: kubeClient,
loxiClients: loxiClients,
LoxiClients: loxiClients,
LoxiPeerClients: loxiPeerClients,
ExternalIPPool: externalIPPool,
ExtSecondaryIPPools: externalSecondaryIPPools,
ExternalIP6Pool: externalIP6Pool,
Expand Down Expand Up @@ -206,7 +209,7 @@ func (m *Manager) enqueueService(obj interface{}) {
m.queue.Add(key)
}

func (m *Manager) Run(stopCh <-chan struct{}, loxiAliveCh <-chan *api.LoxiClient) {
func (m *Manager) Run(stopCh <-chan struct{}, loxiLBLiveCh <-chan *api.LoxiClient) {
defer m.queue.ShutDown()

klog.Infof("Starting %s", mgrName)
Expand All @@ -220,7 +223,7 @@ func (m *Manager) Run(stopCh <-chan struct{}, loxiAliveCh <-chan *api.LoxiClient
return
}

go m.reinstallLoxiLbRules(stopCh, loxiAliveCh)
go m.reinstallLoxiLbRules(stopCh, loxiLBLiveCh)

for i := 0; i < defaultWorkers; i++ {
go wait.Until(m.worker, time.Second, stopCh)
Expand Down Expand Up @@ -609,7 +612,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for _, client := range m.loxiClients {
for _, client := range m.LoxiClients {
ch := make(chan error)
go func(c *api.LoxiClient, h chan error) {
var err error
Expand Down Expand Up @@ -678,7 +681,7 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error {

for _, lb := range lbEntry.LbModelList {
var errChList []chan error
for _, loxiClient := range m.loxiClients {
for _, loxiClient := range m.LoxiClients {
ch := make(chan error)
errChList = append(errChList, ch)

Expand Down Expand Up @@ -725,7 +728,7 @@ func (m *Manager) DeleteAllLoadBalancer() {

for _, lb := range lbEntry.LbModelList {
var errChList []chan error
for _, loxiClient := range m.loxiClients {
for _, loxiClient := range m.LoxiClients {
ch := make(chan error)
errChList = append(errChList, ch)

Expand Down Expand Up @@ -1047,7 +1050,7 @@ loop:
}
}
if !isSuccess {
klog.Exit("restart loxi-ccm")
klog.Exit("restart kube-loxilb")
}
}
}
Expand Down
37 changes: 27 additions & 10 deletions pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ import (
)

type LoxiClient struct {
restClient *RESTClient
RestClient *RESTClient
MasterLB bool
PeeringOnly bool
Url string
Stop chan struct{}
}

// apiServer is string. what format? http://10.0.0.1 or 10.0.0.1
func NewLoxiClient(apiServer string) (*LoxiClient, error) {
func NewLoxiClient(apiServer string, aliveCh chan *LoxiClient, peerOnly bool) (*LoxiClient, error) {
fmt.Println("NewLoxiClient:")
client := &http.Client{}

Expand All @@ -32,28 +36,41 @@ func NewLoxiClient(apiServer string) (*LoxiClient, error) {
return nil, err
}

return &LoxiClient{
restClient: restClient,
}, nil
stop := make(chan struct{})

lc := &LoxiClient{
RestClient: restClient,
Url: apiServer,
Stop: stop,
PeeringOnly: peerOnly,
}

lc.StartLoxiHealthCheckChan(aliveCh)

return lc, nil
}

func (l *LoxiClient) SetLoxiHealthCheckChan(stop <-chan struct{}, aliveCh chan *LoxiClient) {
func (l *LoxiClient) StartLoxiHealthCheckChan(aliveCh chan *LoxiClient) {
isLoxiAlive := true

go wait.Until(func() {
if _, err := l.HealthCheck().Get(context.Background(), ""); err != nil {
if isLoxiAlive {
klog.Infof("LoxiHealthCheckChan: loxilb(%s) is down. isLoxiAlive is changed to 'false'", l.restClient.baseURL.String())
klog.Infof("LoxiHealthCheckChan: loxilb(%s) is down", l.RestClient.baseURL.String())
isLoxiAlive = false
}
} else {
if !isLoxiAlive {
klog.Infof("LoxiHealthCheckChan: loxilb(%s) is alive again. isLoxiAlive is set 'true'", l.restClient.baseURL.String())
klog.Infof("LoxiHealthCheckChan: loxilb(%s) is alive", l.RestClient.baseURL.String())
isLoxiAlive = true
aliveCh <- l
}
}
}, time.Second*2, stop)
}, time.Second*2, l.Stop)
}

func (l *LoxiClient) StopLoxiHealthCheckChan() {
l.Stop <- struct{}{}
}

func (l *LoxiClient) LoadBalancer() *LoadBalancerAPI {
Expand All @@ -69,5 +86,5 @@ func (l *LoxiClient) GetRESTClient() *RESTClient {
return nil
}

return l.restClient
return l.RestClient
}

0 comments on commit e13e4d5

Please sign in to comment.