Skip to content

Add locality loadbalance to kmesh workload mode. #900

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 30, 2024
64 changes: 59 additions & 5 deletions bpf/kmesh/workload/include/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
int ret = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
int rand_k = 0;

if (service_v->prio_endpoint_count[0] == 0)
return 0;

endpoint_k.service_id = service_id;
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
endpoint_k.prio = 0; // for random handle,all endpoints are saved with highest priority

rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[0] + 1;
endpoint_k.backend_index = rand_k;

endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
Expand All @@ -37,6 +44,50 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
return 0;
}

static inline int
lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v, bool is_strict)
{
int ret = 0;
uint32_t rand_k = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
endpoint_k.service_id = service_id;
struct ip_addr zero_addr = {0};
__u32 zero_port = 0;

// #pragma unroll
for (int match_prio = 0; match_prio < PRIO_COUNT; match_prio++) {
endpoint_k.prio = match_prio; // 0->6
// if we have endpoints in this prio
if (service_v->prio_endpoint_count[match_prio] > 0) {
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_prio] + 1;
endpoint_k.backend_index = rand_k;
endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
BPF_LOG(
ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_prio, endpoint_k.backend_index);
return -ENOENT;
}
ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret);
return ret;
}
BPF_LOG(DEBUG, SERVICE, "locality loadbalance matched backend_uid %d\n", endpoint_v->backend_uid);
return 0; // find the backend successfully
}
if (is_strict) { // only match max priority in strict mode
kmesh_ctx->dnat_ip = zero_addr;
kmesh_ctx->dnat_port = zero_port;
BPF_LOG(DEBUG, SERVICE, "locality loadbalance match nothing in STRICT mode\n");
return -ENOENT;
}
}
// no backend matched
return -ENOENT;
}

static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;
Expand All @@ -55,15 +106,18 @@ static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service
return ret;
}

if (service_v->endpoint_count == 0) {
BPF_LOG(DEBUG, SERVICE, "service %u has no endpoint", service_id);
return 0;
}
BPF_LOG(DEBUG, SERVICE, "service [%u] policy [%u] failed", service_id, service_v->lb_policy);

switch (service_v->lb_policy) {
case LB_POLICY_RANDOM:
ret = lb_random_handle(kmesh_ctx, service_id, service_v);
break;
case LB_POLICY_STRICT:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, true);
break;
case LB_POLICY_FAILOVER:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, false);
break;
default:
BPF_LOG(ERR, SERVICE, "unsupported load balance type:%u\n", service_v->lb_policy);
ret = -EINVAL;
Expand Down
7 changes: 5 additions & 2 deletions bpf/kmesh/workload/include/workload.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define PRIO_COUNT 7
#define MAX_MEMBER_NUM_PER_POLICY 4

#pragma pack(1)
Expand All @@ -28,8 +29,9 @@ typedef struct {
} service_key;

typedef struct {
__u32 endpoint_count; // endpoint count of current service
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
// Failover/strict mode
__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
// is MAX_PORT_COUNT-1
__u32 target_port[MAX_PORT_COUNT];
Expand All @@ -40,6 +42,7 @@ typedef struct {
// endpoint map
typedef struct {
__u32 service_id; // service id
__u32 prio; // 0 means heightest prio, match all scope, 6 means lowest prio.
__u32 backend_index; // if endpoint_count = 3, then backend_index = 0/1/2
} endpoint_key;

Expand Down
2 changes: 2 additions & 0 deletions bpf/kmesh/workload/include/workload_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// loadbalance type
typedef enum {
LB_POLICY_RANDOM = 0,
LB_POLICY_STRICT = 1,
LB_POLICY_FAILOVER = 2,
} lb_policy_t;

#pragma pack(1)
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
"istio.io/istio/pkg/util/sets"
)

const (
PrioCount = 7
)

type EndpointKey struct {
ServiceId uint32 // service id
Prio uint32
BackendIndex uint32 // if endpoint_count = 3, then backend_index = 1/2/3
}

Expand Down Expand Up @@ -59,15 +64,17 @@
}

// EndpointSwap update the last endpoint index and remove the current endpoint
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) error {
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error {

Check warning on line 67 in pkg/controller/workload/bpfcache/endpoint.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/endpoint.go#L67

Added line #L67 was not covered by tests
if currentIndex == lastIndex {
return c.EndpointDelete(&EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
})
}
lastKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,

Check warning on line 77 in pkg/controller/workload/bpfcache/endpoint.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/endpoint.go#L77

Added line #L77 was not covered by tests
BackendIndex: lastIndex,
}
lastValue := &EndpointValue{}
Expand All @@ -77,6 +84,7 @@

currentKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,

Check warning on line 87 in pkg/controller/workload/bpfcache/endpoint.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/endpoint.go#L87

Added line #L87 was not covered by tests
BackendIndex: currentIndex,
}
currentValue := &EndpointValue{}
Expand Down
100 changes: 100 additions & 0 deletions pkg/controller/workload/bpfcache/locality_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bpfcache

import (
"sync"

"kmesh.net/kmesh/api/v2/workloadapi"
)

// localityInfo records local node workload locality info
type localityInfo struct {
region string // init from workload.GetLocality().GetRegion()
zone string // init from workload.GetLocality().GetZone()
subZone string // init from workload.GetLocality().GetSubZone()
nodeName string // init from os.Getenv("NODE_NAME"), workload.GetNode()
clusterId string // init from workload.GetClusterId()
network string // workload.GetNetwork()
}

type LocalityCache struct {
mutex sync.RWMutex
LocalityInfo *localityInfo
}

func NewLocalityCache() LocalityCache {
return LocalityCache{
LocalityInfo: nil,
}

Check warning on line 43 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L40-L43

Added lines #L40 - L43 were not covered by tests
}

func (l *LocalityCache) SetLocality(nodeName, clusterId, network string, locality *workloadapi.Locality) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.LocalityInfo == nil {
l.LocalityInfo = &localityInfo{}
}

Check warning on line 51 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L46-L51

Added lines #L46 - L51 were not covered by tests

// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
l.LocalityInfo.region = locality.GetRegion()
l.LocalityInfo.zone = locality.GetZone()
l.LocalityInfo.subZone = locality.GetSubzone()
l.LocalityInfo.nodeName = nodeName
l.LocalityInfo.clusterId = clusterId
l.LocalityInfo.network = network

Check warning on line 59 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L54-L59

Added lines #L54 - L59 were not covered by tests
}

func (l *LocalityCache) CalcLocalityLBPrio(wl *workloadapi.Workload, rp []workloadapi.LoadBalancing_Scope) uint32 {
var rank uint32 = 0
for _, scope := range rp {
match := false
switch scope {
case workloadapi.LoadBalancing_REGION:
if l.LocalityInfo.region == wl.GetLocality().GetRegion() {
match = true
}
case workloadapi.LoadBalancing_ZONE:
if l.LocalityInfo.zone == wl.GetLocality().GetZone() {
match = true
}
case workloadapi.LoadBalancing_SUBZONE:
if l.LocalityInfo.subZone == wl.GetLocality().GetSubzone() {
match = true
}
case workloadapi.LoadBalancing_NODE:
if l.LocalityInfo.nodeName == wl.GetNode() {
match = true
}
case workloadapi.LoadBalancing_CLUSTER:
if l.LocalityInfo.clusterId == wl.GetClusterId() {
match = true
}
case workloadapi.LoadBalancing_NETWORK:
log.Debugf("l.LocalityInfo.network %#v, wl.GetNetwork() %#v", l.LocalityInfo.network, wl.GetNetwork())
if l.LocalityInfo.network == wl.GetNetwork() {
match = true
}
}
if match {
rank++
} else {
break
}
}
return uint32(len(rp)) - rank
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not right. Keep in mind, we shoud match rp []workloadapi.LoadBalancing_Scope in order, if one doesnot match, we should stop.

So i am super sure this result now is nort right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not test coverage, i will write a test later to test your function. I am feeling this function is still not correct

}
Loading
Loading