Skip to content
64 changes: 59 additions & 5 deletions bpf/kmesh/workload/include/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
int ret = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
int rand_k = 0;

if (service_v->prio_endpoint_count[0] == 0)
return 0;

endpoint_k.service_id = service_id;
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
endpoint_k.prio = 0; // for random handle,all endpoints are saved with highest priority

rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[0] + 1;
endpoint_k.backend_index = rand_k;

endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
Expand All @@ -37,6 +44,50 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
return 0;
}

static inline int
lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v, bool is_strict)
{
int ret = 0;
uint32_t rand_k = 0;
endpoint_key endpoint_k = {0};
endpoint_value *endpoint_v = NULL;
endpoint_k.service_id = service_id;
struct ip_addr zero_addr = {0};
__u32 zero_port = 0;

// #pragma unroll
for (int match_prio = 0; match_prio < PRIO_COUNT; match_prio++) {
endpoint_k.prio = match_prio; // 0->6
// if we have endpoints in this prio
if (service_v->prio_endpoint_count[match_prio] > 0) {
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_prio] + 1;
endpoint_k.backend_index = rand_k;
endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
BPF_LOG(
ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_prio, endpoint_k.backend_index);
return -ENOENT;
}
ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret);
return ret;
}
BPF_LOG(DEBUG, SERVICE, "locality loadbalance matched backend_uid %d\n", endpoint_v->backend_uid);
return 0; // find the backend successfully
}
if (is_strict) { // only match max priority in strict mode
kmesh_ctx->dnat_ip = zero_addr;
kmesh_ctx->dnat_port = zero_port;
BPF_LOG(DEBUG, SERVICE, "locality loadbalance match nothing in STRICT mode\n");
return -ENOENT;
}
}
// no backend matched
return -ENOENT;
}

static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;
Expand All @@ -55,15 +106,18 @@ static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service
return ret;
}

if (service_v->endpoint_count == 0) {
BPF_LOG(DEBUG, SERVICE, "service %u has no endpoint", service_id);
return 0;
}
BPF_LOG(DEBUG, SERVICE, "service [%u] policy [%u] failed", service_id, service_v->lb_policy);

switch (service_v->lb_policy) {
case LB_POLICY_RANDOM:
ret = lb_random_handle(kmesh_ctx, service_id, service_v);
break;
case LB_POLICY_STRICT:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, true);
break;
case LB_POLICY_FAILOVER:
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, false);
break;
default:
BPF_LOG(ERR, SERVICE, "unsupported load balance type:%u\n", service_v->lb_policy);
ret = -EINVAL;
Expand Down
7 changes: 5 additions & 2 deletions bpf/kmesh/workload/include/workload.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define PRIO_COUNT 7
#define MAX_MEMBER_NUM_PER_POLICY 4

#pragma pack(1)
Expand All @@ -28,8 +29,9 @@ typedef struct {
} service_key;

typedef struct {
__u32 endpoint_count; // endpoint count of current service
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
// Failover/strict mode
__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
// is MAX_PORT_COUNT-1
__u32 target_port[MAX_PORT_COUNT];
Expand All @@ -40,6 +42,7 @@ typedef struct {
// endpoint map
typedef struct {
__u32 service_id; // service id
__u32 prio; // 0 means heightest prio, match all scope, 6 means lowest prio.
__u32 backend_index; // if endpoint_count = 3, then backend_index = 0/1/2
} endpoint_key;

Expand Down
2 changes: 2 additions & 0 deletions bpf/kmesh/workload/include/workload_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// loadbalance type
typedef enum {
LB_POLICY_RANDOM = 0,
LB_POLICY_STRICT = 1,
LB_POLICY_FAILOVER = 2,
} lb_policy_t;

#pragma pack(1)
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
"istio.io/istio/pkg/util/sets"
)

const (
PrioCount = 7
)

type EndpointKey struct {
ServiceId uint32 // service id
Prio uint32
BackendIndex uint32 // if endpoint_count = 3, then backend_index = 1/2/3
}

Expand Down Expand Up @@ -59,15 +64,17 @@
}

// EndpointSwap update the last endpoint index and remove the current endpoint
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) error {
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error {

Check warning on line 67 in pkg/controller/workload/bpfcache/endpoint.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/endpoint.go#L67

Added line #L67 was not covered by tests
if currentIndex == lastIndex {
return c.EndpointDelete(&EndpointKey{
ServiceId: serviceId,
Prio: prio,
BackendIndex: lastIndex,
})
}
lastKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,

Check warning on line 77 in pkg/controller/workload/bpfcache/endpoint.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/endpoint.go#L77

Added line #L77 was not covered by tests
BackendIndex: lastIndex,
}
lastValue := &EndpointValue{}
Expand All @@ -77,6 +84,7 @@

currentKey := &EndpointKey{
ServiceId: serviceId,
Prio: prio,

Check warning on line 87 in pkg/controller/workload/bpfcache/endpoint.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/endpoint.go#L87

Added line #L87 was not covered by tests
BackendIndex: currentIndex,
}
currentValue := &EndpointValue{}
Expand Down
100 changes: 100 additions & 0 deletions pkg/controller/workload/bpfcache/locality_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bpfcache

import (
"sync"

"kmesh.net/kmesh/api/v2/workloadapi"
)

// localityInfo records local node workload locality info
type localityInfo struct {
region string // init from workload.GetLocality().GetRegion()
zone string // init from workload.GetLocality().GetZone()
subZone string // init from workload.GetLocality().GetSubZone()
nodeName string // init from os.Getenv("NODE_NAME"), workload.GetNode()
clusterId string // init from workload.GetClusterId()
network string // workload.GetNetwork()
}

type LocalityCache struct {
mutex sync.RWMutex
LocalityInfo *localityInfo
}

func NewLocalityCache() LocalityCache {
return LocalityCache{
LocalityInfo: nil,
}

Check warning on line 43 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L40-L43

Added lines #L40 - L43 were not covered by tests
}

func (l *LocalityCache) SetLocality(nodeName, clusterId, network string, locality *workloadapi.Locality) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.LocalityInfo == nil {
l.LocalityInfo = &localityInfo{}
}

Check warning on line 51 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L46-L51

Added lines #L46 - L51 were not covered by tests

// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
l.LocalityInfo.region = locality.GetRegion()
l.LocalityInfo.zone = locality.GetZone()
l.LocalityInfo.subZone = locality.GetSubzone()
l.LocalityInfo.nodeName = nodeName
l.LocalityInfo.clusterId = clusterId
l.LocalityInfo.network = network

Check warning on line 59 in pkg/controller/workload/bpfcache/locality_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/locality_cache.go#L54-L59

Added lines #L54 - L59 were not covered by tests
}

func (l *LocalityCache) CalcLocalityLBPrio(wl *workloadapi.Workload, rp []workloadapi.LoadBalancing_Scope) uint32 {
var rank uint32 = 0
for _, scope := range rp {
match := false
switch scope {
case workloadapi.LoadBalancing_REGION:
if l.LocalityInfo.region == wl.GetLocality().GetRegion() {
match = true
}
case workloadapi.LoadBalancing_ZONE:
if l.LocalityInfo.zone == wl.GetLocality().GetZone() {
match = true
}
case workloadapi.LoadBalancing_SUBZONE:
if l.LocalityInfo.subZone == wl.GetLocality().GetSubzone() {
match = true
}
case workloadapi.LoadBalancing_NODE:
if l.LocalityInfo.nodeName == wl.GetNode() {
match = true
}
case workloadapi.LoadBalancing_CLUSTER:
if l.LocalityInfo.clusterId == wl.GetClusterId() {
match = true
}
case workloadapi.LoadBalancing_NETWORK:
log.Debugf("l.LocalityInfo.network %#v, wl.GetNetwork() %#v", l.LocalityInfo.network, wl.GetNetwork())
if l.LocalityInfo.network == wl.GetNetwork() {
match = true
}
}
if match {
rank++
} else {
break
}
}
return uint32(len(rp)) - rank
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not right. Keep in mind, we shoud match rp []workloadapi.LoadBalancing_Scope in order, if one doesnot match, we should stop.

So i am super sure this result now is nort right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not test coverage, i will write a test later to test your function. I am feeling this function is still not correct

}
Loading
Loading