Skip to content
64 changes: 59 additions & 5 deletions bpf/kmesh/workload/include/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
int ret = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
int rand_k = 0;

if (service_v->prio_endpoint_count[0] == 0)
return 0;

endpoint_k.service_id = service_id;
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
endpoint_k.prio = 0; // for random handle,all endpoints are saved with highest priority

rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[0] + 1;
endpoint_k.backend_index = rand_k;

endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
Expand All @@ -37,6 +44,50 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
return 0;
}

static inline int
lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v, bool is_strict)
{
int ret = 0;
uint32_t rand_k = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
endpoint_k.service_id = service_id;
struct ip_addr zero_addr = {0};
__u32 zero_port = 0;

// #pragma unroll
for (int match_prio = 0; match_prio < PRIO_COUNT; match_prio++) {
endpoint_k.prio = match_prio; // 0->6
// if we have endpoints in this prio
if (service_v->prio_endpoint_count[match_prio] > 0) {
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_prio] + 1;
endpoint_k.backend_index = rand_k;
endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
BPF_LOG(
ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_prio, endpoint_k.backend_index);
return -ENOENT;
}
ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret);
return ret;
}
BPF_LOG(DEBUG, SERVICE, "locality loadbalance matched backend_uid %d\n", endpoint_v->backend_uid);
return 0; // find the backend successfully
}
if (is_strict) { // only match max priority in strict mode
kmesh_ctx->dnat_ip = zero_addr;
kmesh_ctx->dnat_port = zero_port;
BPF_LOG(DEBUG, SERVICE, "locality loadbalance match nothing in STRICT mode\n");
return -ENOENT;
}
}
// no backend matched
return -ENOENT;
}

static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;
Expand All @@ -55,15 +106,18 @@ static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service
return ret;
}

if (service_v->endpoint_count == 0) {
BPF_LOG(DEBUG, SERVICE, "service %u has no endpoint", service_id);
return 0;
}
BPF_LOG(DEBUG, SERVICE, "service [%u] policy [%u] failed", service_id, service_v->lb_policy);

switch (service_v->lb_policy) {
case LB_POLICY_RANDOM:
ret = lb_random_handle(kmesh_ctx, service_id, service_v);
break;
case LB_POLICY_STRICT:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, true);
break;
case LB_POLICY_FAILOVER:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, false);
break;
default:
BPF_LOG(ERR, SERVICE, "unsupported load balance type:%u\n", service_v->lb_policy);
ret = -EINVAL;
Expand Down
7 changes: 5 additions & 2 deletions bpf/kmesh/workload/include/workload.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define PRIO_COUNT 7
#define MAX_MEMBER_NUM_PER_POLICY 4

#pragma pack(1)
Expand All @@ -28,8 +29,9 @@ typedef struct {
} service_key;

typedef struct {
__u32 endpoint_count; // endpoint count of current service
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
// Failover/strict mode
__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
// is MAX_PORT_COUNT-1
__u32 target_port[MAX_PORT_COUNT];
Expand All @@ -40,6 +42,7 @@ typedef struct {
// endpoint map
typedef struct {
__u32 service_id; // service id
__u32 prio; // 0 means heightest prio, match all scope, 6 means lowest prio.
__u32 backend_index; // if endpoint_count = 3, then backend_index = 0/1/2
} endpoint_key;

Expand Down
2 changes: 2 additions & 0 deletions bpf/kmesh/workload/include/workload_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// loadbalance type
typedef enum {
LB_POLICY_RANDOM = 0,
LB_POLICY_STRICT = 1,
LB_POLICY_FAILOVER = 2,
} lb_policy_t;

#pragma pack(1)
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
"istio.io/istio/pkg/util/sets"
)

const (
PrioCount = 7
)

type EndpointKey struct {
ServiceId uint32 // service id
Prio uint32
BackendIndex uint32 // if endpoint_count = 3, then backend_index = 1/2/3
}

Expand Down Expand Up @@ -59,15 +64,17 @@
}

// EndpointSwap update the last endpoint index and remove the current endpoint
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) error {
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error {

Check warning on line 67 in pkg/controller/workload/bpfcache/endpoint.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/endpoint.go#L67

Added line #L67 was not covered by tests
if currentIndex == lastIndex {
return c.EndpointDelete(&EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
})
}
lastKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
}
lastValue := &EndpointValue{}
Expand All @@ -77,6 +84,7 @@

currentKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: currentIndex,
}
currentValue := &EndpointValue{}
Expand Down
141 changes: 141 additions & 0 deletions pkg/controller/workload/bpfcache/locality_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bpfcache

import (
"sync"

"kmesh.net/kmesh/api/v2/workloadapi"
)

const (
REGION = 1 << iota // 000001
ZONE // 000010
SUBZONE // 000100
NODENAME // 001000
CLUSTERID // 010000
NETWORK // 100000
)

type localityInfo struct {
region string // init from workload.GetLocality().GetRegion()
zone string // init from workload.GetLocality().GetZone()
subZone string // init from workload.GetLocality().GetSubZone()
nodeName string // init from os.Getenv("NODE_NAME"), workload.GetNode()
clusterId string // init from workload.GetClusterId()
network string // workload.GetNetwork()
mask uint32 // mask
}

func Valid(s string) bool {
return s != ""
}

func (l *localityInfo) Set(s string, param uint32) {
if !Valid(s) {
return
}
switch param {
case REGION:
l.region = s
case ZONE:
l.zone = s
case SUBZONE:
l.subZone = s
case NODENAME:
l.nodeName = s

Check warning on line 60 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L59-L60

Added lines #L59 - L60 were not covered by tests
case CLUSTERID:
l.clusterId = s
case NETWORK:
l.network = s
}
l.mask |= param
}

func (l *localityInfo) Clear(param uint32) {
l.mask &= ^param

Check warning on line 70 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

func (l *localityInfo) IsSet(param uint32) bool {
return l.mask&param != 0

Check warning on line 74 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}

type LocalityCache struct {
mutex sync.RWMutex
LocalityInfo *localityInfo
}

func NewLocalityCache() LocalityCache {
return LocalityCache{
LocalityInfo: nil,
}
}

func (l *LocalityCache) SetLocality(nodeName, clusterId, network string, locality *workloadapi.Locality) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.LocalityInfo == nil {
l.LocalityInfo = &localityInfo{}
}

// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
l.LocalityInfo.Set(nodeName, NODENAME)
l.LocalityInfo.Set(locality.GetRegion(), REGION)
l.LocalityInfo.Set(locality.GetSubzone(), SUBZONE)
l.LocalityInfo.Set(locality.GetZone(), ZONE)
l.LocalityInfo.Set(clusterId, CLUSTERID)
l.LocalityInfo.Set(network, NETWORK)
}

func (l *LocalityCache) CalcLocalityLBPrio(wl *workloadapi.Workload, rp []workloadapi.LoadBalancing_Scope) uint32 {
var rank uint32 = 0
for _, scope := range rp {
switch scope {
case workloadapi.LoadBalancing_REGION:
log.Debugf("l.LocalityInfo.region %#v, wl.GetLocality().GetRegion() %#v", l.LocalityInfo.region, wl.GetLocality().GetRegion())
if l.LocalityInfo.region == wl.GetLocality().GetRegion() {
rank++
}
case workloadapi.LoadBalancing_ZONE:
log.Debugf("l.LocalityInfo.zone %#v, wl.GetLocality().GetZone() %#v", l.LocalityInfo.zone, wl.GetLocality().GetZone())
if l.LocalityInfo.zone == wl.GetLocality().GetZone() {
rank++
}
case workloadapi.LoadBalancing_SUBZONE:
log.Debugf("l.LocalityInfo.subZone %#v, wl.GetLocality().GetSubzone() %#v", l.LocalityInfo.subZone, wl.GetLocality().GetSubzone())
if l.LocalityInfo.subZone == wl.GetLocality().GetSubzone() {
rank++
}
case workloadapi.LoadBalancing_NODE:
log.Debugf("l.LocalityInfo.nodeName %#v, wl.GetNode() %#v", l.LocalityInfo.nodeName, wl.GetNode())
if l.LocalityInfo.nodeName == wl.GetNode() {
rank++
}
case workloadapi.LoadBalancing_NETWORK:
log.Debugf("l.LocalityInfo.network %#v, wl.GetNetwork() %#v", l.LocalityInfo.network, wl.GetNetwork())
if l.LocalityInfo.network == wl.GetNetwork() {
rank++
}
case workloadapi.LoadBalancing_CLUSTER:
log.Debugf("l.LocalityInfo.clusterId %#v, wl.GetClusterId() %#v", l.LocalityInfo.clusterId, wl.GetClusterId())
if l.LocalityInfo.clusterId == wl.GetClusterId() {
rank++
}

Check warning on line 137 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L123-L137

Added lines #L123 - L137 were not covered by tests
}
}
return uint32(len(rp)) - rank
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not right. Keep in mind, we shoud match rp []workloadapi.LoadBalancing_Scope in order, if one doesnot match, we should stop.

So i am super sure this result now is nort right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not test coverage, i will write a test later to test your function. I am feeling this function is still not correct

}
6 changes: 3 additions & 3 deletions pkg/controller/workload/bpfcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type ServicePorts [MaxPortNum]uint32
type TargetPorts [MaxPortNum]uint32

type ServiceValue struct {
EndpointCount uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is MaxPortNum-1
EndpointCount [PrioCount]uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is MaxPortNum-1
TargetPort TargetPorts
WaypointAddr [16]byte
WaypointPort uint32
Expand Down
Loading
Loading