Skip to content

Commit 2346ed3

Browse files
committed
fix issues
Signed-off-by: derekwin <[email protected]>
1 parent 94225bd commit 2346ed3

File tree

4 files changed

+43
-81
lines changed

4 files changed

+43
-81
lines changed

bpf/kmesh/workload/include/service.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, s
6161
// if we have endpoints in this prio
6262
if (service_v->prio_endpoint_count[match_prio] > 0) {
6363
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_prio] + 1;
64-
if (rand_k >= MAP_SIZE_OF_BACKEND) {
65-
return -ENOENT;
66-
}
6764
endpoint_k.backend_index = rand_k;
6865
endpoint_v = map_lookup_endpoint(&endpoint_k);
6966
if (!endpoint_v) {

pkg/controller/workload/bpfcache/locality_cache.go

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -75,90 +75,65 @@ func (l *localityInfo) IsSet(param uint32) bool {
7575
}
7676

7777
type LocalityCache struct {
78-
mutex sync.RWMutex
79-
localityInfo localityInfo
80-
isLocalityInfoSet bool
81-
workloadWaitQueue map[string]struct{} // workload.GetUid()
78+
mutex sync.RWMutex
79+
LocalityInfo *localityInfo
8280
}
8381

8482
func NewLocalityCache() LocalityCache {
8583
return LocalityCache{
86-
localityInfo: localityInfo{},
87-
isLocalityInfoSet: false,
88-
workloadWaitQueue: make(map[string]struct{}),
84+
LocalityInfo: nil,
8985
}
9086
}
9187

9288
func (l *LocalityCache) SetLocality(nodeName, clusterId, network string, locality *workloadapi.Locality) {
93-
// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
94-
l.localityInfo.Set(nodeName, NODENAME)
95-
l.localityInfo.Set(locality.GetRegion(), REGION)
96-
l.localityInfo.Set(locality.GetSubzone(), SUBZONE)
97-
l.localityInfo.Set(locality.GetZone(), ZONE)
98-
l.localityInfo.Set(clusterId, CLUSTERID)
99-
l.localityInfo.Set(network, NETWORK)
100-
101-
l.isLocalityInfoSet = true
102-
}
89+
if l.LocalityInfo == nil {
90+
l.LocalityInfo = &localityInfo{}
91+
}
10392

104-
func (l *LocalityCache) IsLocalityInfoSet() bool {
105-
log.Debugf("isLocalityInfoSet: %#v", l.isLocalityInfoSet)
106-
return l.isLocalityInfoSet
93+
// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
94+
l.LocalityInfo.Set(nodeName, NODENAME)
95+
l.LocalityInfo.Set(locality.GetRegion(), REGION)
96+
l.LocalityInfo.Set(locality.GetSubzone(), SUBZONE)
97+
l.LocalityInfo.Set(locality.GetZone(), ZONE)
98+
l.LocalityInfo.Set(clusterId, CLUSTERID)
99+
l.LocalityInfo.Set(network, NETWORK)
107100
}
108101

109-
func (l *LocalityCache) CalcuLocalityLBPrio(wl *workloadapi.Workload, rp []workloadapi.LoadBalancing_Scope) uint32 {
102+
func (l *LocalityCache) CalcLocalityLBPrio(wl *workloadapi.Workload, rp []workloadapi.LoadBalancing_Scope) uint32 {
110103
var rank uint32 = 0
111104
for _, scope := range rp {
112105
switch scope {
113106
case workloadapi.LoadBalancing_REGION:
114-
log.Debugf("l.localityInfo.IsSet(REGION) %#v, Valid(wl.GetLocality().GetRegion()) %#v, l.localityInfo.region %#v, wl.GetLocality().GetRegion() %#v", l.localityInfo.IsSet(REGION), Valid(wl.GetLocality().GetRegion()), l.localityInfo.region, wl.GetLocality().GetRegion())
115-
if l.localityInfo.IsSet(REGION) && Valid(wl.GetLocality().GetRegion()) && l.localityInfo.region == wl.GetLocality().GetRegion() {
107+
log.Debugf("l.LocalityInfo.region %#v, wl.GetLocality().GetRegion() %#v", l.LocalityInfo.region, wl.GetLocality().GetRegion())
108+
if l.LocalityInfo.region == wl.GetLocality().GetRegion() {
116109
rank++
117110
}
118111
case workloadapi.LoadBalancing_ZONE:
119-
log.Debugf("l.localityInfo.IsSet(ZONE) %#v, Valid(wl.GetLocality().GetZone()) %#v, l.localityInfo.zone %#v, wl.GetLocality().GetZone() %#v", l.localityInfo.IsSet(ZONE), Valid(wl.GetLocality().GetZone()), l.localityInfo.zone, wl.GetLocality().GetZone())
120-
if l.localityInfo.IsSet(ZONE) && Valid(wl.GetLocality().GetZone()) && l.localityInfo.zone == wl.GetLocality().GetZone() {
112+
log.Debugf("l.LocalityInfo.zone %#v, wl.GetLocality().GetZone() %#v", l.LocalityInfo.zone, wl.GetLocality().GetZone())
113+
if l.LocalityInfo.zone == wl.GetLocality().GetZone() {
121114
rank++
122115
}
123116
case workloadapi.LoadBalancing_SUBZONE:
124-
log.Debugf("l.localityInfo.IsSet(SUBZONE) %#v, Valid(wl.GetLocality().GetSubzone()) %#v, l.localityInfo.subZone %#v, wl.GetLocality().GetSubzone() %#v", l.localityInfo.IsSet(SUBZONE), Valid(wl.GetLocality().GetSubzone()), l.localityInfo.subZone, wl.GetLocality().GetSubzone())
125-
if l.localityInfo.IsSet(SUBZONE) && Valid(wl.GetLocality().GetSubzone()) && l.localityInfo.subZone == wl.GetLocality().GetSubzone() {
117+
log.Debugf("l.LocalityInfo.subZone %#v, wl.GetLocality().GetSubzone() %#v", l.LocalityInfo.subZone, wl.GetLocality().GetSubzone())
118+
if l.LocalityInfo.subZone == wl.GetLocality().GetSubzone() {
126119
rank++
127120
}
128121
case workloadapi.LoadBalancing_NODE:
129-
log.Debugf("l.localityInfo.IsSet(NODENAME) %#v, Valid(wl.GetNode()) %#v, l.localityInfo.nodeName %#v, wl.GetNode() %#v", l.localityInfo.IsSet(NODENAME), Valid(wl.GetNode()), l.localityInfo.nodeName, wl.GetNode())
130-
if l.localityInfo.IsSet(NODENAME) && Valid(wl.GetNode()) && l.localityInfo.nodeName == wl.GetNode() {
122+
log.Debugf("l.LocalityInfo.nodeName %#v, wl.GetNode() %#v", l.LocalityInfo.nodeName, wl.GetNode())
123+
if l.LocalityInfo.nodeName == wl.GetNode() {
131124
rank++
132125
}
133126
case workloadapi.LoadBalancing_NETWORK:
134-
log.Debugf("l.localityInfo.IsSet(NETWORK) %#v, Valid(wl.GetNetwork()) %#v, l.localityInfo.network %#v, wl.GetNetwork() %#v", l.localityInfo.IsSet(NETWORK), Valid(wl.GetNetwork()), l.localityInfo.network, wl.GetNetwork())
135-
if l.localityInfo.IsSet(NETWORK) && Valid(wl.GetNetwork()) && l.localityInfo.network == wl.GetNetwork() {
127+
log.Debugf("l.LocalityInfo.network %#v, wl.GetNetwork() %#v", l.LocalityInfo.network, wl.GetNetwork())
128+
if l.LocalityInfo.network == wl.GetNetwork() {
136129
rank++
137130
}
138131
case workloadapi.LoadBalancing_CLUSTER:
139-
log.Debugf("l.localityInfo.IsSet(CLUSTERID) %#v, Valid(wl.GetClusterId()) %#v, l.localityInfo.clusterId %#v, wl.GetClusterId() %#v", l.localityInfo.IsSet(CLUSTERID), Valid(wl.GetClusterId()), l.localityInfo.clusterId, wl.GetClusterId())
140-
if l.localityInfo.IsSet(CLUSTERID) && Valid(wl.GetClusterId()) && l.localityInfo.clusterId == wl.GetClusterId() {
132+
log.Debugf("l.LocalityInfo.clusterId %#v, wl.GetClusterId() %#v", l.LocalityInfo.clusterId, wl.GetClusterId())
133+
if l.LocalityInfo.clusterId == wl.GetClusterId() {
141134
rank++
142135
}
143136
}
144137
}
145138
return uint32(len(rp)) - rank
146139
}
147-
148-
func (l *LocalityCache) SaveToWaitQueue(wl *workloadapi.Workload) {
149-
l.mutex.Lock()
150-
defer l.mutex.Unlock()
151-
l.workloadWaitQueue[wl.Uid] = struct{}{}
152-
}
153-
154-
func (l *LocalityCache) DelWorkloadFromWaitQueue(wl *workloadapi.Workload) {
155-
l.mutex.Lock()
156-
defer l.mutex.Unlock()
157-
delete(l.workloadWaitQueue, wl.Uid)
158-
}
159-
160-
func (l *LocalityCache) GetFromWaitQueue() map[string]struct{} {
161-
l.mutex.Lock()
162-
defer l.mutex.Unlock()
163-
return l.workloadWaitQueue
164-
}

pkg/controller/workload/workload_processor.go

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -358,15 +358,13 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa
358358
return err
359359
}
360360
} else { // locality mode
361-
if p.locality.IsLocalityInfoSet() {
362-
service := p.ServiceCache.GetService(p.hashName.NumToStr(svcUid))
363-
prio := p.locality.CalcuLocalityLBPrio(workload, service.LoadBalancing.GetRoutingPreference())
361+
service := p.ServiceCache.GetService(p.hashName.NumToStr(svcUid))
362+
if p.locality.LocalityInfo != nil && service != nil {
363+
prio := p.locality.CalcLocalityLBPrio(workload, service.LoadBalancing.GetRoutingPreference())
364364
if err, _ = p.addWorkloadToService(&sk, &sv, workloadId, prio); err != nil {
365365
log.Errorf("addWorkloadToService workload %d service %d priority %d failed: %v", workloadId, sk.ServiceId, prio, err)
366366
return err
367367
}
368-
} else { // locality LB mode, but we need to set up all localityCache fields before adding endpoint
369-
p.locality.SaveToWaitQueue(workload)
370368
}
371369
}
372370
}
@@ -427,7 +425,7 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error {
427425
p.storeWorkloadPolicies(workload.GetUid(), workload.GetAuthorizationPolicies())
428426

429427
// update kmesh localityCache
430-
if !p.locality.IsLocalityInfoSet() && p.nodeName == workload.GetNode() { // todo
428+
if p.locality.LocalityInfo == nil && p.nodeName == workload.GetNode() { // todo
431429
p.locality.SetLocality(p.nodeName, workload.GetClusterId(), workload.GetNetwork(), workload.GetLocality())
432430
}
433431

@@ -496,6 +494,9 @@ func (p *Processor) storeServiceFrontendData(serviceId uint32, service *workload
496494
return nil
497495
}
498496

497+
// toLLb indicates whether we are performing a locality load balance update.
498+
// If toLLb is true, it means we need to calculate priority; otherwise,
499+
// it represents a random strategy, in which case we just set the priority to 0.
499500
func (p *Processor) updateEndpoint(serviceId uint32, toLLb bool) error {
500501
var (
501502
err error
@@ -522,20 +523,20 @@ func (p *Processor) updateEndpoint(serviceId uint32, toLLb bool) error {
522523

523524
sKey := bpf.ServiceKey{ServiceId: serviceId}
524525
sValue := bpf.ServiceValue{}
526+
if err = p.bpf.ServiceLookup(&sKey, &sValue); err != nil {
527+
log.Errorf("Lookup service %d failed: %v", sKey.ServiceId, err)
528+
}
529+
525530
for _, uid := range backendUids {
526531
// add new endpoint
527532
if toLLb {
528533
service := p.ServiceCache.GetService(p.hashName.NumToStr(serviceId))
529534
workload := p.WorkloadCache.GetWorkloadByUid(p.hashName.NumToStr(uid))
530-
prio = p.locality.CalcuLocalityLBPrio(workload, service.LoadBalancing.GetRoutingPreference())
535+
prio = p.locality.CalcLocalityLBPrio(workload, service.LoadBalancing.GetRoutingPreference())
531536
} else {
532537
prio = 0 // to random
533538
}
534539

535-
if err = p.bpf.ServiceLookup(&sKey, &sValue); err != nil {
536-
log.Errorf("Lookup service %d failed: %v", sKey.ServiceId, err)
537-
}
538-
539540
if err, _ = p.addWorkloadToService(&sKey, &sValue, uid, prio); err != nil {
540541
log.Errorf("addWorkloadToService workload %d service %d failed: %v", uid, sKey.ServiceId, err)
541542
return err
@@ -717,17 +718,6 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis
717718
}
718719
}
719720

720-
// Add new services associated with the workload
721-
for wUid := range p.locality.GetFromWaitQueue() { // locality LB mode
722-
workload := p.WorkloadCache.GetWorkloadByUid(wUid)
723-
_, newServices := p.compareWorkloadServices(workload)
724-
if err := p.handleWorkloadNewBoundServices(workload, newServices); err != nil {
725-
log.Errorf("handleWorkloadNewBoundServices %s failed: %v", workload.ResourceName(), err)
726-
return err
727-
}
728-
p.locality.DelWorkloadFromWaitQueue(workload)
729-
}
730-
731721
p.handleRemovedAddresses(rsp.RemovedResources)
732722
p.once.Do(p.handleRemovedAddressesDuringRestart)
733723
return err

pkg/controller/workload/workload_processor_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ func serviceToAddress(service *workloadapi.Service) *workloadapi.Address {
674674
}
675675
}
676676

677-
func TestCalcuLocalityLbPrio(t *testing.T) {
677+
func TestCalcLocalityLbPrio(t *testing.T) {
678678
workloadMap := bpfcache.NewFakeWorkloadMap(t)
679679
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)
680680

@@ -693,10 +693,10 @@ func TestCalcuLocalityLbPrio(t *testing.T) {
693693
wl2 := createWorkload("wl2", "10.244.0.2", os.Getenv("NODE_NAME"), workloadapi.NetworkMode_STANDARD, createLocality("r1", "z1", "s2"), "svc1") // prio 1
694694
wl3 := createWorkload("wl3", "10.244.0.3", os.Getenv("NODE_NAME"), workloadapi.NetworkMode_STANDARD, createLocality("r1", "z2", "s2"), "svc1") // prio 2
695695
wl4 := createWorkload("wl4", "10.244.0.4", os.Getenv("NODE_NAME"), workloadapi.NetworkMode_STANDARD, createLocality("r2", "z2", "s2"), "svc1") // prio 3
696-
assert.Equal(t, uint32(0), p.locality.CalcuLocalityLBPrio(wl1, llbSvc.GetLoadBalancing().GetRoutingPreference()))
697-
assert.Equal(t, uint32(1), p.locality.CalcuLocalityLBPrio(wl2, llbSvc.GetLoadBalancing().GetRoutingPreference()))
698-
assert.Equal(t, uint32(2), p.locality.CalcuLocalityLBPrio(wl3, llbSvc.GetLoadBalancing().GetRoutingPreference()))
699-
assert.Equal(t, uint32(3), p.locality.CalcuLocalityLBPrio(wl4, llbSvc.GetLoadBalancing().GetRoutingPreference()))
696+
assert.Equal(t, uint32(0), p.locality.CalcLocalityLBPrio(wl1, llbSvc.GetLoadBalancing().GetRoutingPreference()))
697+
assert.Equal(t, uint32(1), p.locality.CalcLocalityLBPrio(wl2, llbSvc.GetLoadBalancing().GetRoutingPreference()))
698+
assert.Equal(t, uint32(2), p.locality.CalcLocalityLBPrio(wl3, llbSvc.GetLoadBalancing().GetRoutingPreference()))
699+
assert.Equal(t, uint32(3), p.locality.CalcLocalityLBPrio(wl4, llbSvc.GetLoadBalancing().GetRoutingPreference()))
700700

701701
hashNameClean(p)
702702
}

0 commit comments

Comments
 (0)