Skip to content

Commit

Permalink
add locality loadbalance
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwin committed Sep 28, 2024
1 parent 20cb2d8 commit e37cf23
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 27 deletions.
56 changes: 51 additions & 5 deletions bpf/kmesh/workload/include/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
int ret = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
int rand_k = 0;

endpoint_k.service_id = service_id;
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
endpoint_k.prio = MAX_PRIO; // for random handle,all endpoints are saved in MAX_PRIO

rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MAX_PRIO]+1;
endpoint_k.backend_index = rand_k;

endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
Expand All @@ -37,6 +41,45 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
return 0;
}

static inline int lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v, bool is_strict)
{
int ret = 0;
uint32_t rand_k = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
endpoint_k.service_id = service_id;

// #pragma unroll
for (int match_rank = MAX_PRIO; match_rank>=0; match_rank--) {
endpoint_k.prio = match_rank; // 6->0
// if we have endpoints in this prio
if (service_v->prio_endpoint_count[match_rank] > 0) {
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_rank]+1;
if (rand_k >= MAP_SIZE_OF_BACKEND) {
return -ENOENT;
}
endpoint_k.backend_index = rand_k;
endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
BPF_LOG(ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_rank, endpoint_k.backend_index);
return -ENOENT;
}
ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret);
return ret;
}
return 0; // find the backend successfully
}
if (is_strict && match_rank == service_v->lb_strict_index) { // only match lb strict index
return -ENOENT;
}
}
// no backend matched
return -ENOENT;
}

static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;
Expand All @@ -55,15 +98,18 @@ static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service
return ret;
}

if (service_v->endpoint_count == 0) {
BPF_LOG(DEBUG, SERVICE, "service %u has no endpoint", service_id);
return 0;
}
BPF_LOG(DEBUG, SERVICE, "service [%u] policy [%u] failed", service_id, service_v->lb_policy);

switch (service_v->lb_policy) {
case LB_POLICY_RANDOM:
ret = lb_random_handle(kmesh_ctx, service_id, service_v);
break;
case LB_POLICY_STRICT:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, true);
break;
case LB_POLICY_FAILOVER:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, false);
break;
default:
BPF_LOG(ERR, SERVICE, "unsupported load balance type:%u\n", service_v->lb_policy);
ret = -EINVAL;
Expand Down
8 changes: 6 additions & 2 deletions bpf/kmesh/workload/include/workload.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define MAX_PRIO 6
#define MAX_PRIO_COUNT MAX_PRIO+1 // 6 means match all scope, 0 means match nothing
#define MAX_MEMBER_NUM_PER_POLICY 4

#pragma pack(1)
Expand All @@ -28,8 +30,9 @@ typedef struct {
} service_key;

typedef struct {
__u32 endpoint_count; // endpoint count of current service
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
__u32 prio_endpoint_count[MAX_PRIO_COUNT];// endpoint count of current service with prio, prio from 6->0
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance Failover/strict mode
__u32 lb_strict_index; // for failover strict mode
__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
// is MAX_PORT_COUNT-1
__u32 target_port[MAX_PORT_COUNT];
Expand All @@ -40,6 +43,7 @@ typedef struct {
// endpoint map
typedef struct {
__u32 service_id; // service id
__u32 prio; // prio means rank, 6 means match all, and 0 means match nothing
__u32 backend_index; // if endpoint_count = 3, then backend_index = 0/1/2
} endpoint_key;

Expand Down
2 changes: 2 additions & 0 deletions bpf/kmesh/workload/include/workload_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// loadbalance type
typedef enum {
LB_POLICY_RANDOM = 0,
LB_POLICY_STRICT = 1,
LB_POLICY_FAILOVER = 2,
} lb_policy_t;

#pragma pack(1)
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ import (
"istio.io/istio/pkg/util/sets"
)

const (
MaxPrio = 6
MaxPrioNum = 7
)

type EndpointKey struct {
ServiceId uint32 // service id
Prio uint32
BackendIndex uint32 // if endpoint_count = 3, then backend_index = 1/2/3
}

Expand Down Expand Up @@ -59,15 +65,17 @@ func (c *Cache) EndpointDelete(key *EndpointKey) error {
}

// EndpointSwap update the last endpoint index and remove the current endpoint
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) error {
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error {
if currentIndex == lastIndex {
return c.EndpointDelete(&EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
})
}
lastKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
}
lastValue := &EndpointValue{}
Expand All @@ -77,6 +85,7 @@ func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) e

currentKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: currentIndex,
}
currentValue := &EndpointValue{}
Expand Down
154 changes: 154 additions & 0 deletions pkg/controller/workload/bpfcache/locality_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package bpfcache

import (
"kmesh.net/kmesh/api/v2/workloadapi"
)

const (
REGION = 1 << iota // 000001
ZONE // 000010
SUBZONE // 000100
NODENAME // 001000
CLUSTERID // 010000
NETWORK // 100000
)

type localityInfo struct {
region string // init from workload.GetLocality().GetRegion()
zone string // init from workload.GetLocality().GetZone()
subZone string // init from workload.GetLocality().GetSubZone()
nodeName string // init from os.Getenv("NODE_NAME"), workload.GetNode()
clusterId string // init from workload.GetClusterId()
network string // workload.GetNetwork()
mask uint32 // mask
}

func Valid(s string) bool {
return s != ""
}

func (l *localityInfo) Set(s string, param uint32) {
if !Valid(s) {
return
}
switch param {
case REGION:
l.region = s
case ZONE:
l.zone = s
case SUBZONE:
l.subZone = s
case NODENAME:
l.nodeName = s
case CLUSTERID:
l.clusterId = s
case NETWORK:
l.network = s
}
l.mask |= param
}

func (l *localityInfo) Clear(param uint32) {
l.mask &= ^param
}

func (l *localityInfo) IsSet(param uint32) bool {
return l.mask&param != 0
}

type LocalityCache struct {
LbPolicy uint32
localityInfo localityInfo
LbStrictIndex uint32 // for failover strict mode
isLocalityInfoSet bool
RoutingPreference []workloadapi.LoadBalancing_Scope
isRoutingPreferenceSet bool
workloadWaitQueue map[*workloadapi.Workload]struct{}
}

func NewLocalityCache() *LocalityCache {
return &LocalityCache{
localityInfo: localityInfo{},
isLocalityInfoSet: false,
RoutingPreference: make([]workloadapi.LoadBalancing_Scope, 0),
isRoutingPreferenceSet: false,
workloadWaitQueue: make(map[*workloadapi.Workload]struct{}),
}
}

func (l *LocalityCache) SetLocality(nodeName, clusterId, network string, locality *workloadapi.Locality) {
// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
l.localityInfo.Set(nodeName, NODENAME)
l.localityInfo.Set(locality.GetRegion(), REGION)
l.localityInfo.Set(locality.GetSubzone(), SUBZONE)
l.localityInfo.Set(locality.GetZone(), ZONE)
l.localityInfo.Set(clusterId, CLUSTERID)
l.localityInfo.Set(network, NETWORK)

l.isLocalityInfoSet = true
}

func (l *LocalityCache) SetRoutingPreference(s []workloadapi.LoadBalancing_Scope) {
// notice: s should set by lb.GetRoutingPreference()
if len(s) > 0 {
l.RoutingPreference = s
l.LbStrictIndex = uint32(len(s))
l.isRoutingPreferenceSet = true
}
}

func (l *LocalityCache) CanLocalityLB() bool {
log.Debugf("isLocalityInfoSet: %#v, isRoutingPreferenceSet: %#v", l.isLocalityInfoSet, l.isRoutingPreferenceSet)
return l.isLocalityInfoSet && l.isRoutingPreferenceSet
}

func (l *LocalityCache) CalcuLocalityLBPrio(wl *workloadapi.Workload) uint32 {
var rank uint32 = 0
for scope := range l.RoutingPreference {
switch scope {
case int(workloadapi.LoadBalancing_REGION):
log.Debugf("l.localityInfo.IsSet(REGION) %#v, Valid(wl.GetLocality().GetRegion()) %#v, l.localityInfo.region %#v, wl.GetLocality().GetRegion() %#v", l.localityInfo.IsSet(REGION), Valid(wl.GetLocality().GetRegion()), l.localityInfo.region, wl.GetLocality().GetRegion())
if l.localityInfo.IsSet(REGION) && Valid(wl.GetLocality().GetRegion()) && l.localityInfo.region == wl.GetLocality().GetRegion() {
rank++
}
case int(workloadapi.LoadBalancing_ZONE):
log.Debugf("l.localityInfo.IsSet(ZONE) %#v, Valid(wl.GetLocality().GetZone()) %#v, l.localityInfo.zone %#v, wl.GetLocality().GetZone() %#v", l.localityInfo.IsSet(ZONE), Valid(wl.GetLocality().GetZone()), l.localityInfo.zone, wl.GetLocality().GetZone())
if l.localityInfo.IsSet(ZONE) && Valid(wl.GetLocality().GetZone()) && l.localityInfo.zone == wl.GetLocality().GetZone() {
rank++
}
case int(workloadapi.LoadBalancing_SUBZONE):
log.Debugf("l.localityInfo.IsSet(SUBZONE) %#v, Valid(wl.GetLocality().GetSubzone()) %#v, l.localityInfo.subZone %#v, wl.GetLocality().GetSubzone() %#v", l.localityInfo.IsSet(SUBZONE), Valid(wl.GetLocality().GetSubzone()), l.localityInfo.subZone, wl.GetLocality().GetSubzone())
if l.localityInfo.IsSet(SUBZONE) && Valid(wl.GetLocality().GetSubzone()) && l.localityInfo.subZone == wl.GetLocality().GetSubzone() {
rank++
}
case int(workloadapi.LoadBalancing_NODE):
log.Debugf("l.localityInfo.IsSet(NODENAME) %#v, Valid(wl.GetNode()) %#v, l.localityInfo.nodeName %#v, wl.GetNode() %#v", l.localityInfo.IsSet(NODENAME), Valid(wl.GetNode()), l.localityInfo.nodeName, wl.GetNode())
if l.localityInfo.IsSet(NODENAME) && Valid(wl.GetNode()) && l.localityInfo.nodeName == wl.GetNode() {
rank++
}
case int(workloadapi.LoadBalancing_NETWORK):
log.Debugf("l.localityInfo.IsSet(NETWORK) %#v, Valid(wl.GetNetwork()) %#v, l.localityInfo.network %#v, wl.GetNetwork() %#v", l.localityInfo.IsSet(NETWORK), Valid(wl.GetNetwork()), l.localityInfo.network, wl.GetNetwork())
if l.localityInfo.IsSet(NETWORK) && Valid(wl.GetNetwork()) && l.localityInfo.network == wl.GetNetwork() {
rank++
}
case int(workloadapi.LoadBalancing_CLUSTER):
log.Debugf("l.localityInfo.IsSet(CLUSTERID) %#v, Valid(wl.GetClusterId()) %#v, l.localityInfo.clusterId %#v, wl.GetClusterId() %#v", l.localityInfo.IsSet(CLUSTERID), Valid(wl.GetClusterId()), l.localityInfo.clusterId, wl.GetClusterId())
if l.localityInfo.IsSet(CLUSTERID) && Valid(wl.GetClusterId()) && l.localityInfo.clusterId == wl.GetClusterId() {
rank++
}
}
}
return rank
}

func (l *LocalityCache) SaveToWaitQueue(wl *workloadapi.Workload) {
l.workloadWaitQueue[wl] = struct{}{}
}

func (l *LocalityCache) DelWorkloadFromWaitQueue(wl *workloadapi.Workload) {
delete(l.workloadWaitQueue, wl)
}

func (l *LocalityCache) GetFromWaitQueue() map[*workloadapi.Workload]struct{} {
return l.workloadWaitQueue
}
5 changes: 3 additions & 2 deletions pkg/controller/workload/bpfcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type ServicePorts [MaxPortNum]uint32
type TargetPorts [MaxPortNum]uint32

type ServiceValue struct {
EndpointCount uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
EndpointCount [MaxPrioNum]uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
LbStrictIndex uint32
ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is MaxPortNum-1
TargetPort TargetPorts
WaypointAddr [16]byte
Expand Down
Loading

0 comments on commit e37cf23

Please sign in to comment.