Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locality loadbalance to kmesh workload mode. #900

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions bpf/kmesh/workload/include/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
int ret = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
int rand_k = 0;

endpoint_k.service_id = service_id;
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
endpoint_k.prio = MIN_PRIO; // for random handle,all endpoints are saved in MIN_PRIO

rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[MIN_PRIO] + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can return fast by checking 0 here since you removed it from the caller

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

short circuit

endpoint_k.backend_index = rand_k;

endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
Expand All @@ -37,6 +41,47 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
return 0;
}

static inline int
lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v, bool is_strict)
{
int ret = 0;
uint32_t rand_k = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
endpoint_k.service_id = service_id;

// #pragma unroll
for (int match_prio = 0; match_prio < PRIO_COUNT; match_prio++) {
endpoint_k.prio = match_prio; // 0->6
// if we have endpoints in this prio
if (service_v->prio_endpoint_count[match_prio] > 0) {
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_prio] + 1;
if (rand_k >= MAP_SIZE_OF_BACKEND) {
return -ENOENT;
}
endpoint_k.backend_index = rand_k;
endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
BPF_LOG(
ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_prio, endpoint_k.backend_index);
return -ENOENT;
}
ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret);
return ret;
}
return 0; // find the backend successfully
}
if (is_strict && match_prio == service_v->lb_strict_prio) { // only match lb strict index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a little bit more, i am confused about this

return -ENOENT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not align with ztunnel behavior, this return value would make the traffic goes through like directly accessing the clusterIP, but not blocking the traffic

}
}
// no backend matched
return -ENOENT;
}

static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;
Expand All @@ -55,15 +100,18 @@ static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service
return ret;
}

if (service_v->endpoint_count == 0) {
BPF_LOG(DEBUG, SERVICE, "service %u has no endpoint", service_id);
return 0;
}
BPF_LOG(DEBUG, SERVICE, "service [%u] policy [%u] failed", service_id, service_v->lb_policy);

switch (service_v->lb_policy) {
case LB_POLICY_RANDOM:
ret = lb_random_handle(kmesh_ctx, service_id, service_v);
break;
case LB_POLICY_STRICT:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, true);
break;
case LB_POLICY_FAILOVER:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, false);
break;
default:
BPF_LOG(ERR, SERVICE, "unsupported load balance type:%u\n", service_v->lb_policy);
ret = -EINVAL;
Expand Down
9 changes: 7 additions & 2 deletions bpf/kmesh/workload/include/workload.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define MIN_PRIO 6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant with PRIO_COUNT

#define PRIO_COUNT 7
#define MAX_MEMBER_NUM_PER_POLICY 4

#pragma pack(1)
Expand All @@ -28,8 +30,10 @@ typedef struct {
} service_key;

typedef struct {
__u32 endpoint_count; // endpoint count of current service
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
// Failover/strict mode
__u32 lb_strict_prio; // for failover strict mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this is not needed , LB_POLICY_FAILOVER and LB_POLICY_STRICT values are supported

__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
// is MAX_PORT_COUNT-1
__u32 target_port[MAX_PORT_COUNT];
Expand All @@ -40,6 +44,7 @@ typedef struct {
// endpoint map
typedef struct {
__u32 service_id; // service id
__u32 prio; // 0 means heightest prio, match all scope, 6 means lowest prio.
__u32 backend_index; // if endpoint_count = 3, then backend_index = 0/1/2
} endpoint_key;

Expand Down
2 changes: 2 additions & 0 deletions bpf/kmesh/workload/include/workload_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// loadbalance type
typedef enum {
LB_POLICY_RANDOM = 0,
LB_POLICY_STRICT = 1,
LB_POLICY_FAILOVER = 2,
} lb_policy_t;

#pragma pack(1)
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ import (
"istio.io/istio/pkg/util/sets"
)

const (
MinPrio = 6
PrioCount = 7
)

type EndpointKey struct {
ServiceId uint32 // service id
Prio uint32
BackendIndex uint32 // if endpoint_count = 3, then backend_index = 1/2/3
}

Expand Down Expand Up @@ -59,15 +65,17 @@ func (c *Cache) EndpointDelete(key *EndpointKey) error {
}

// EndpointSwap update the last endpoint index and remove the current endpoint
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) error {
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error {
if currentIndex == lastIndex {
return c.EndpointDelete(&EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
})
}
lastKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
}
lastValue := &EndpointValue{}
Expand All @@ -77,6 +85,7 @@ func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) e

currentKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: currentIndex,
}
currentValue := &EndpointValue{}
Expand Down
170 changes: 170 additions & 0 deletions pkg/controller/workload/bpfcache/locality_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bpfcache

import (
"kmesh.net/kmesh/api/v2/workloadapi"
)

const (
REGION = 1 << iota // 000001
ZONE // 000010
SUBZONE // 000100
NODENAME // 001000
CLUSTERID // 010000
NETWORK // 100000
)

type localityInfo struct {
region string // init from workload.GetLocality().GetRegion()
zone string // init from workload.GetLocality().GetZone()
subZone string // init from workload.GetLocality().GetSubZone()
nodeName string // init from os.Getenv("NODE_NAME"), workload.GetNode()
clusterId string // init from workload.GetClusterId()
network string // workload.GetNetwork()
mask uint32 // mask
}

func Valid(s string) bool {
return s != ""
}

func (l *localityInfo) Set(s string, param uint32) {
if !Valid(s) {
return
}
switch param {
case REGION:
l.region = s
case ZONE:
l.zone = s
case SUBZONE:
l.subZone = s
case NODENAME:
l.nodeName = s
case CLUSTERID:
l.clusterId = s
case NETWORK:
l.network = s
}
l.mask |= param
}

func (l *localityInfo) Clear(param uint32) {
l.mask &= ^param
}

func (l *localityInfo) IsSet(param uint32) bool {
return l.mask&param != 0
}

type LocalityCache struct {
LbPolicy uint32
localityInfo localityInfo
LbStrictPrio uint32 // for failover strict mode
isLocalityInfoSet bool
RoutingPreference []workloadapi.LoadBalancing_Scope
isRoutingPreferenceSet bool
workloadWaitQueue map[*workloadapi.Workload]struct{}
}

func NewLocalityCache() *LocalityCache {
return &LocalityCache{
localityInfo: localityInfo{},
isLocalityInfoSet: false,
RoutingPreference: make([]workloadapi.LoadBalancing_Scope, 0),
isRoutingPreferenceSet: false,
workloadWaitQueue: make(map[*workloadapi.Workload]struct{}),
}
}

func (l *LocalityCache) SetLocality(nodeName, clusterId, network string, locality *workloadapi.Locality) {
// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
l.localityInfo.Set(nodeName, NODENAME)
l.localityInfo.Set(locality.GetRegion(), REGION)
l.localityInfo.Set(locality.GetSubzone(), SUBZONE)
l.localityInfo.Set(locality.GetZone(), ZONE)
l.localityInfo.Set(clusterId, CLUSTERID)
l.localityInfo.Set(network, NETWORK)

l.isLocalityInfoSet = true
}

func (l *LocalityCache) SetRoutingPreference(s []workloadapi.LoadBalancing_Scope) {
// notice: s should set by lb.GetRoutingPreference()
if len(s) > 0 {
l.RoutingPreference = s
l.LbStrictPrio = uint32(MinPrio - len(s))
l.isRoutingPreferenceSet = true
}
}

func (l *LocalityCache) CanLocalityLB() bool {
log.Debugf("isLocalityInfoSet: %#v, isRoutingPreferenceSet: %#v", l.isLocalityInfoSet, l.isRoutingPreferenceSet)
return l.isLocalityInfoSet && l.isRoutingPreferenceSet
}

func (l *LocalityCache) CalcuLocalityLBPrio(wl *workloadapi.Workload) uint32 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand this: From my perspective, the workload priority has to do with the original node info.

If the node and the workload both reside in same RoutingPreference [], the workload has highest priority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the earliest design version, it was designed this way. During a kmesh community meeting, you suggested considering the current design format. Given that the kmesh process exists on every node as a DaemonSet, it is possible to pre-calculate the priority of each workload within a service relative to that node on each node. This way, when subsequent traffic arrives and accesses the service, the backend to serve can be selected based on the pre-calculated priorities of the workloads for that service.

var rank uint32 = 0
for scope := range l.RoutingPreference {
switch scope {
case int(workloadapi.LoadBalancing_REGION):
log.Debugf("l.localityInfo.IsSet(REGION) %#v, Valid(wl.GetLocality().GetRegion()) %#v, l.localityInfo.region %#v, wl.GetLocality().GetRegion() %#v", l.localityInfo.IsSet(REGION), Valid(wl.GetLocality().GetRegion()), l.localityInfo.region, wl.GetLocality().GetRegion())
if l.localityInfo.IsSet(REGION) && Valid(wl.GetLocality().GetRegion()) && l.localityInfo.region == wl.GetLocality().GetRegion() {
rank++
}
case int(workloadapi.LoadBalancing_ZONE):
log.Debugf("l.localityInfo.IsSet(ZONE) %#v, Valid(wl.GetLocality().GetZone()) %#v, l.localityInfo.zone %#v, wl.GetLocality().GetZone() %#v", l.localityInfo.IsSet(ZONE), Valid(wl.GetLocality().GetZone()), l.localityInfo.zone, wl.GetLocality().GetZone())
if l.localityInfo.IsSet(ZONE) && Valid(wl.GetLocality().GetZone()) && l.localityInfo.zone == wl.GetLocality().GetZone() {
rank++
}
case int(workloadapi.LoadBalancing_SUBZONE):
log.Debugf("l.localityInfo.IsSet(SUBZONE) %#v, Valid(wl.GetLocality().GetSubzone()) %#v, l.localityInfo.subZone %#v, wl.GetLocality().GetSubzone() %#v", l.localityInfo.IsSet(SUBZONE), Valid(wl.GetLocality().GetSubzone()), l.localityInfo.subZone, wl.GetLocality().GetSubzone())
if l.localityInfo.IsSet(SUBZONE) && Valid(wl.GetLocality().GetSubzone()) && l.localityInfo.subZone == wl.GetLocality().GetSubzone() {
rank++
}
case int(workloadapi.LoadBalancing_NODE):
log.Debugf("l.localityInfo.IsSet(NODENAME) %#v, Valid(wl.GetNode()) %#v, l.localityInfo.nodeName %#v, wl.GetNode() %#v", l.localityInfo.IsSet(NODENAME), Valid(wl.GetNode()), l.localityInfo.nodeName, wl.GetNode())
if l.localityInfo.IsSet(NODENAME) && Valid(wl.GetNode()) && l.localityInfo.nodeName == wl.GetNode() {
rank++
}
case int(workloadapi.LoadBalancing_NETWORK):
log.Debugf("l.localityInfo.IsSet(NETWORK) %#v, Valid(wl.GetNetwork()) %#v, l.localityInfo.network %#v, wl.GetNetwork() %#v", l.localityInfo.IsSet(NETWORK), Valid(wl.GetNetwork()), l.localityInfo.network, wl.GetNetwork())
if l.localityInfo.IsSet(NETWORK) && Valid(wl.GetNetwork()) && l.localityInfo.network == wl.GetNetwork() {
rank++
}
case int(workloadapi.LoadBalancing_CLUSTER):
log.Debugf("l.localityInfo.IsSet(CLUSTERID) %#v, Valid(wl.GetClusterId()) %#v, l.localityInfo.clusterId %#v, wl.GetClusterId() %#v", l.localityInfo.IsSet(CLUSTERID), Valid(wl.GetClusterId()), l.localityInfo.clusterId, wl.GetClusterId())
if l.localityInfo.IsSet(CLUSTERID) && Valid(wl.GetClusterId()) && l.localityInfo.clusterId == wl.GetClusterId() {
rank++
}
}
}
return MinPrio - rank
}

func (l *LocalityCache) SaveToWaitQueue(wl *workloadapi.Workload) {
l.workloadWaitQueue[wl] = struct{}{}
}

func (l *LocalityCache) DelWorkloadFromWaitQueue(wl *workloadapi.Workload) {
delete(l.workloadWaitQueue, wl)
}

func (l *LocalityCache) GetFromWaitQueue() map[*workloadapi.Workload]struct{} {
return l.workloadWaitQueue
}
5 changes: 3 additions & 2 deletions pkg/controller/workload/bpfcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type ServicePorts [MaxPortNum]uint32
type TargetPorts [MaxPortNum]uint32

type ServiceValue struct {
EndpointCount uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
EndpointCount [PrioCount]uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
LbStrictPrio uint32
ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is MaxPortNum-1
TargetPort TargetPorts
WaypointAddr [16]byte
Expand Down
Loading
Loading