Skip to content

Commit 1be65fb

Browse files
committed
change priority from 0 to 6
Signed-off-by: derekwin <[email protected]>
1 parent ffc97e8 commit 1be65fb

File tree

7 files changed

+56
-40
lines changed

7 files changed

+56
-40
lines changed

bpf/kmesh/workload/include/service.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
2020
int rand_k = 0;
2121

2222
endpoint_k.service_id = service_id;
23-
endpoint_k.prio = MAX_PRIO; // for random handle,all endpoints are saved in MAX_PRIO
23+
endpoint_k.prio = MIN_PRIO; // for random handle,all endpoints are saved in MIN_PRIO
2424

25-
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MAX_PRIO] + 1;
25+
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MIN_PRIO] + 1;
2626
endpoint_k.backend_index = rand_k;
2727

2828
endpoint_v = map_lookup_endpoint(&endpoint_k);
@@ -51,19 +51,19 @@ lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, s
5151
endpoint_k.service_id = service_id;
5252

5353
// #pragma unroll
54-
for (int match_rank = MAX_PRIO; match_rank >= 0; match_rank--) {
55-
endpoint_k.prio = match_rank; // 6->0
54+
for (int match_prio = 0; match_prio < PRIO_COUNT; match_prio++) {
55+
endpoint_k.prio = match_prio; // 0->6
5656
// if we have endpoints in this prio
57-
if (service_v->prio_endpoint_count[match_rank] > 0) {
58-
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_rank] + 1;
57+
if (service_v->prio_endpoint_count[match_prio] > 0) {
58+
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_prio] + 1;
5959
if (rand_k >= MAP_SIZE_OF_BACKEND) {
6060
return -ENOENT;
6161
}
6262
endpoint_k.backend_index = rand_k;
6363
endpoint_v = map_lookup_endpoint(&endpoint_k);
6464
if (!endpoint_v) {
6565
BPF_LOG(
66-
ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_rank, endpoint_k.backend_index);
66+
ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_prio, endpoint_k.backend_index);
6767
return -ENOENT;
6868
}
6969
ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v);
@@ -74,7 +74,7 @@ lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, s
7474
}
7575
return 0; // find the backend successfully
7676
}
77-
if (is_strict && match_rank == service_v->lb_strict_index) { // only match lb strict index
77+
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index
7878
return -ENOENT;
7979
}
8080
}

bpf/kmesh/workload/include/workload.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
#define MAX_PORT_COUNT 10
1111
#define MAX_SERVICE_COUNT 10
1212
#define RINGBUF_SIZE (1 << 12)
13-
#define MAX_PRIO 6
14-
#define MAX_PRIO_COUNT MAX_PRIO + 1 // 6 means match all scope, 0 means match nothing
13+
#define MIN_PRIO 6
14+
#define PRIO_COUNT 7
1515
#define MAX_MEMBER_NUM_PER_POLICY 4
1616

1717
#pragma pack(1)
@@ -30,10 +30,10 @@ typedef struct {
3030
} service_key;
3131

3232
typedef struct {
33-
__u32 prio_endpoint_count[MAX_PRIO_COUNT]; // endpoint count of current service with prio, prio from 6->0
34-
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
35-
// Failover/strict mode
36-
__u32 lb_strict_index; // for failover strict mode
33+
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
34+
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
35+
// Failover/strict mode
36+
__u32 lb_strict_prio; // for failover strict mode
3737
__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
3838
// is MAX_PORT_COUNT-1
3939
__u32 target_port[MAX_PORT_COUNT];
@@ -44,7 +44,7 @@ typedef struct {
4444
// endpoint map
4545
typedef struct {
4646
__u32 service_id; // service id
47-
__u32 prio; // prio means rank, 6 means match all, and 0 means match nothing
47+
__u32 prio; // 0 means heightest prio, match all scope, 6 means lowest prio.
4848
__u32 backend_index; // if endpoint_count = 3, then backend_index = 0/1/2
4949
} endpoint_key;
5050

pkg/controller/workload/bpfcache/endpoint.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
)
2323

2424
const (
25-
MaxPrio = 6
26-
MaxPrioNum = 7
25+
MinPrio = 6
26+
PrioCount = 7
2727
)
2828

2929
type EndpointKey struct {

pkg/controller/workload/bpfcache/locality_cache.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright The Kmesh Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package bpfcache
218

319
import (
@@ -59,7 +75,7 @@ func (l *localityInfo) IsSet(param uint32) bool {
5975
type LocalityCache struct {
6076
LbPolicy uint32
6177
localityInfo localityInfo
62-
LbStrictIndex uint32 // for failover strict mode
78+
LbStrictPrio uint32 // for failover strict mode
6379
isLocalityInfoSet bool
6480
RoutingPreference []workloadapi.LoadBalancing_Scope
6581
isRoutingPreferenceSet bool
@@ -92,7 +108,7 @@ func (l *LocalityCache) SetRoutingPreference(s []workloadapi.LoadBalancing_Scope
92108
// notice: s should set by lb.GetRoutingPreference()
93109
if len(s) > 0 {
94110
l.RoutingPreference = s
95-
l.LbStrictIndex = uint32(len(s))
111+
l.LbStrictPrio = uint32(MinPrio - len(s))
96112
l.isRoutingPreferenceSet = true
97113
}
98114
}
@@ -138,7 +154,7 @@ func (l *LocalityCache) CalcuLocalityLBPrio(wl *workloadapi.Workload) uint32 {
138154
}
139155
}
140156
}
141-
return rank
157+
return MinPrio - rank
142158
}
143159

144160
func (l *LocalityCache) SaveToWaitQueue(wl *workloadapi.Workload) {

pkg/controller/workload/bpfcache/service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ type ServicePorts [MaxPortNum]uint32
3232
type TargetPorts [MaxPortNum]uint32
3333

3434
type ServiceValue struct {
35-
EndpointCount [MaxPrioNum]uint32 // endpoint count of current service
36-
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
37-
LbStrictIndex uint32
35+
EndpointCount [PrioCount]uint32 // endpoint count of current service
36+
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
37+
LbStrictPrio uint32
3838
ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is MaxPortNum-1
3939
TargetPort TargetPorts
4040
WaypointAddr [16]byte

pkg/controller/workload/workload_processor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na
278278
}
279279

280280
var i uint32
281-
for j := 0; j <= bpf.MaxPrio; j++ {
281+
for j := 0; j < bpf.PrioCount; j++ {
282282
if svDelete.EndpointCount[j] == 0 {
283283
continue
284284
}
@@ -352,7 +352,7 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa
352352
// the service already stored in map, add endpoint
353353
if err = p.bpf.ServiceLookup(&sk, &sv); err == nil {
354354
if sv.LbPolicy == LbPolicyRandom { // random mode
355-
if err = p.addWorkloadToService(&sk, &sv, workloadId, bpf.MaxPrio); err != nil { // In random mode, we save all workload to maxprio
355+
if err = p.addWorkloadToService(&sk, &sv, workloadId, bpf.MinPrio); err != nil { // In random mode, we save all workload to minprio
356356
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err)
357357
return err
358358
}
@@ -363,7 +363,7 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa
363363
log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err)
364364
return err
365365
}
366-
} else { // locality LB mode, but we need to set up all localityCache fields before add endpoint
366+
} else { // locality LB mode, but we need to set up all localityCache fields before adding endpoint
367367
p.locality.SaveToWaitQueue(workload)
368368
}
369369
}
@@ -507,8 +507,8 @@ func (p *Processor) storeServiceData(serviceName string, waypoint *workloadapi.G
507507
newValue.LbPolicy = uint32(lb.GetMode()) // set loadbalance mode
508508
p.locality.SetRoutingPreference(lb.GetRoutingPreference())
509509
p.locality.LbPolicy = newValue.LbPolicy
510-
log.Debugf("lbPolicy:%v, routingPreference:%v, strictIndex:%v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictIndex)
511-
newValue.LbStrictIndex = p.locality.LbStrictIndex
510+
log.Debugf("lbPolicy:%v, routingPreference:%v, strictIndex:%v", newValue.LbPolicy, p.locality.RoutingPreference, p.locality.LbStrictPrio)
511+
newValue.LbStrictPrio = p.locality.LbStrictPrio
512512

513513
if waypoint != nil && waypoint.GetAddress() != nil {
514514
nets.CopyIpByteFromSlice(&newValue.WaypointAddr, waypoint.GetAddress().Address)

pkg/controller/workload/workload_processor_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func Test_handleWorkload(t *testing.T) {
6565
svcID := checkFrontEndMap(t, fakeSvc.Addresses[0].Address, p)
6666

6767
// 2.2 check service map contains service
68-
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MaxPrio, 1)
68+
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MinPrio, 1)
6969

7070
// 2.3 check endpoint map now contains the workloads
7171
ek.BackendIndex = 1
@@ -88,7 +88,7 @@ func Test_handleWorkload(t *testing.T) {
8888
assert.Equal(t, ev.BackendUid, workload2ID)
8989

9090
// 3.2 check service map contains service
91-
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MaxPrio, 2)
91+
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MinPrio, 2)
9292

9393
// 4 modify workload2 attribute not related with services
9494
workload2.Waypoint = &workloadapi.GatewayAddress{
@@ -113,7 +113,7 @@ func Test_handleWorkload(t *testing.T) {
113113
assert.Equal(t, ev.BackendUid, workload2ID)
114114

115115
// 4.2 check service map contains service
116-
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MaxPrio, 2)
116+
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MinPrio, 2)
117117

118118
// 4.3 check backend map contains waypoint
119119
checkBackendMap(t, p, workload2ID, workload2)
@@ -125,7 +125,7 @@ func Test_handleWorkload(t *testing.T) {
125125
assert.NoError(t, err)
126126

127127
// 5.1 check service map
128-
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MaxPrio, 1)
128+
checkServiceMap(t, p, svcID, fakeSvc, bpfcache.MinPrio, 1)
129129

130130
// 5.2 check endpoint map
131131
ek.BackendIndex = 1
@@ -141,7 +141,7 @@ func Test_handleWorkload(t *testing.T) {
141141
// 6.1 check front end map contains service
142142
svcID = checkFrontEndMap(t, wpSvc.Addresses[0].Address, p)
143143
// 6.2 check service map contains service, but no waypoint address
144-
checkServiceMap(t, p, svcID, wpSvc, bpfcache.MaxPrio, 0)
144+
checkServiceMap(t, p, svcID, wpSvc, bpfcache.MinPrio, 0)
145145

146146
// 7. test add unhealthy workload
147147
workload3 := createFakeWorkload("1.2.3.7", workloadapi.NetworkMode_STANDARD)
@@ -525,9 +525,9 @@ func TestRestart(t *testing.T) {
525525
assert.Equal(t, 6, p.bpf.FrontendCount())
526526
// check service map
527527
t.Log("1. check service map")
528-
checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, bpfcache.MaxPrio, 1)
529-
checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, bpfcache.MaxPrio, 2)
530-
checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, bpfcache.MaxPrio, 2)
528+
checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, bpfcache.MinPrio, 1)
529+
checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, bpfcache.MinPrio, 2)
530+
checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, bpfcache.MinPrio, 2)
531531
assert.Equal(t, 3, p.bpf.ServiceCount())
532532
// check endpoint map
533533
t.Log("1. check endpoint map")
@@ -603,10 +603,10 @@ func TestRestart(t *testing.T) {
603603
assert.Equal(t, 7, p.bpf.FrontendCount())
604604

605605
// check service map
606-
checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, bpfcache.MaxPrio, 2) // svc1 has 2 wl1, wl2
607-
checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, bpfcache.MaxPrio, 1) // svc2 has 1 wl2
608-
checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, bpfcache.MaxPrio, 1) // svc3 has 1 wl2
609-
checkServiceMap(t, p, p.hashName.Hash(svc4.ResourceName()), svc4, bpfcache.MaxPrio, 1) // svc4 has 1 wl4
606+
checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, bpfcache.MinPrio, 2) // svc1 has 2 wl1, wl2
607+
checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, bpfcache.MinPrio, 1) // svc2 has 1 wl2
608+
checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, bpfcache.MinPrio, 1) // svc3 has 1 wl2
609+
checkServiceMap(t, p, p.hashName.Hash(svc4.ResourceName()), svc4, bpfcache.MinPrio, 1) // svc4 has 1 wl4
610610
assert.Equal(t, 4, p.bpf.ServiceCount())
611611
// check endpoint map
612612
checkEndpointMap(t, p, svc1, []uint32{p.hashName.Hash(wl1.ResourceName()), p.hashName.Hash(wl2.ResourceName())})

0 commit comments

Comments
 (0)