Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kmesh ads mode cluster add consistent hash lb #888

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/cluster/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message Cluster {
ROUND_ROBIN = 0;
LEAST_REQUEST = 1;
RANDOM = 3;
MAGLEV = 5;
}

core.ApiStatus api_status = 128;
Expand Down
16 changes: 16 additions & 0 deletions api/route/route_components.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,26 @@ message RouteAction {
// assigned to each cluster.
WeightedCluster weighted_clusters = 3;
}

message HashPolicy {
message Header {
// The name of the request header that will be used to obtain the hash
// key. If the request header is not present, no hash will be produced.
string header_name = 1;
}

oneof policy_specifier {

// Header hash policy.
Header header = 1;
}
}

// the matched prefix (or path) should be swapped with this value.
string prefix_rewrite = 5;
uint32 timeout = 8;
RetryPolicy retry_policy = 9;
repeated HashPolicy hash_policy = 15;
}

message RetryPolicy {
Expand Down
116 changes: 113 additions & 3 deletions bpf/kmesh/ads/include/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "endpoint/endpoint.pb-c.h"

#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN
#define MAGLEV_TABLE_SIZE 16381
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is table size set to this?

#define HASH_INIT4_SEED 0xcafe

struct {
__uint(type, BPF_MAP_TYPE_HASH);
Expand All @@ -20,6 +22,22 @@ struct {
__uint(max_entries, MAP_SIZE_OF_CLUSTER);
} map_of_cluster SEC(".maps");

struct inner_of_maglev {
__uint(type, BPF_MAP_TYPE_ARRAY);
__uint(max_entries, 1);
__uint(key_size, sizeof(__u32));
__uint(value_size, sizeof(__u32) * MAGLEV_TABLE_SIZE);
};

struct {
__uint(type, BPF_MAP_TYPE_HASH_OF_MAPS);
__uint(key_size, CLUSTER_NAME_MAX_LEN);
__uint(value_size, sizeof(__u32));
__uint(max_entries, MAP_SIZE_OF_CLUSTER);
__uint(map_flags, BPF_F_NO_PREALLOC);
__array(values, struct inner_of_maglev);
} outer_of_maglev SEC(".maps");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use the outer-inner map structure here?

struct cluster_endpoints {
__u32 ep_num;
/* */
Expand Down Expand Up @@ -63,6 +81,16 @@ static inline struct cluster_endpoints *map_lookup_cluster_eps(const char *clust
return kmesh_map_lookup_elem(&map_of_cluster_eps, cluster_name);
}

static inline __u32 *map_lookup_cluster_inner_map(const char *cluster_name)
{
return kmesh_map_lookup_elem(&outer_of_maglev, cluster_name);
}

static inline __u32 *map_lookup_lb_hash() {
int location = 0;
return kmesh_map_lookup_elem(&map_of_lb_hash,&location);
}

static inline int map_add_cluster_eps(const char *cluster_name, const struct cluster_endpoints *eps)
{
return kmesh_map_update_elem(&map_of_cluster_eps, cluster_name, eps);
Expand Down Expand Up @@ -219,14 +247,95 @@ static inline void *loadbalance_round_robin(struct cluster_endpoints *eps)
return (void *)eps->ep_identity[idx];
}

static inline void *cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy)
/* The daddr is explicitly excluded from the hash here in order to allow for
* backend selection to choose the same backend even on different service VIPs.
*/
// static __always_inline __u32 hash_from_tuple_v4(struct bpf_sock * sk)
// {

// BPF_LOG(INFO, CLUSTER, "sk: src_ip is:%x, src_port is:%d\n", sk->src_ip4,sk->src_port);
// BPF_LOG(INFO, CLUSTER, "sk: dst_port is:%d, sk->protocol is: %x\n",sk->dst_port,sk->protocol);
// return jhash_3words(sk->src_ip4,
// ((__u32) sk->dst_port << 16) | sk->src_port,
// sk->protocol, HASH_INIT4_SEED);
// }

static __always_inline __u32 map_array_get_32(__u32 *array, __u32 index, const __u32 limit)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a source ref.

{
__u32 datum = 0;

// if (__builtin_constant_p(index) ||
// !__builtin_constant_p(limit))
// __throw_build_bug();

/* LLVM tends to optimize code away that is needed for the verifier to
* understand dynamic map access. Input constraint is that index < limit
* for this util function, so we never fail here, and returned datum is
* always valid.
*/
asm volatile("%[index] <<= 2\n\t"
"if %[index] > %[limit] goto +1\n\t"
"%[array] += %[index]\n\t"
"%[datum] = *(u32 *)(%[array] + 0)\n\t"
: [datum] "=r"(datum)
: [limit] "i"(limit), [array] "r"(array), [index] "r"(index)
: /* no clobbers */);

return datum;
}

static inline void *loadbalance_maglev_select_backend(struct cluster_endpoints *eps, char *name, ctx_buff_t *ctx)
{
if (!eps || eps->ep_num == 0)
return NULL;

__u32 inner_key = 0;
__u32 *backend_ids;
__u32 index;
__u32 id;
__u32 *inner_of_maglev;
struct bpf_sock_ops *skops;
__u32 *hash_ptr;
__u32 hash;

inner_of_maglev = map_lookup_cluster_inner_map(name);
if (!inner_of_maglev) {
return NULL;
}
backend_ids = bpf_map_lookup_elem(inner_of_maglev, &inner_key);
if (!backend_ids) {
return NULL;
}
hash_ptr = map_lookup_lb_hash();
if (!hash_ptr || *hash_ptr == 0) {
hash = bpf_get_prandom_u32();
}else {
hash = *hash_ptr;
BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, got a hash value:%u\n", hash);
}
index = hash % MAGLEV_TABLE_SIZE;
if (index >= MAGLEV_TABLE_SIZE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will index is greater than MAGLEV_TABLE_SIZE after the % operation?

return NULL;
id = map_array_get_32(backend_ids, index, MAGLEV_TABLE_SIZE);
BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, select backend id:%u\n", id);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this log is unnecessary.

if (id >= KMESH_PER_ENDPOINT_NUM)
return NULL;

return (void *)eps->ep_identity[id];
}

static inline void *
cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy, char *name, ctx_buff_t *ctx)
{
void *ep_identity = NULL;

switch (lb_policy) {
case CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN:
ep_identity = loadbalance_round_robin(eps);
break;
case CLUSTER__CLUSTER__LB_POLICY__MAGLEV:
ep_identity = loadbalance_maglev_select_backend(eps, name, ctx);
break;
default:
BPF_LOG(INFO, CLUSTER, "%d lb_policy is unsupported, default:ROUND_ROBIN\n", lb_policy);
ep_identity = loadbalance_round_robin(eps);
Expand Down Expand Up @@ -254,7 +363,8 @@ static inline Core__SocketAddress *cluster_get_ep_sock_addr(const void *ep_ident
return sock_addr;
}

static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx)
static inline int
cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx, const char *cluster_name)
{
char *name = NULL;
void *ep_identity = NULL;
Expand All @@ -273,7 +383,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_
return -EAGAIN;
}

ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy);
ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy, cluster_name, ctx);
if (!ep_identity) {
BPF_LOG(ERR, CLUSTER, "cluster=\"%s\" handle lb failed\n", name);
return -EAGAIN;
Expand Down
28 changes: 15 additions & 13 deletions bpf/kmesh/ads/include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,19 @@

// ************
// array len
#define KMESH_NAME_LEN 64
#define KMESH_TYPE_LEN 64
#define KMESH_HOST_LEN 128
#define KMESH_FILTER_CHAINS_LEN 64
#define KMESH_HTTP_DOMAIN_NUM 32
#define KMESH_HTTP_DOMAIN_LEN 128
#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN
#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER
#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST
#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE
#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT
#define KMESH_PER_HEADER_MUM 32
#define KMESH_PER_WEIGHT_CLUSTER_NUM 32
#define KMESH_NAME_LEN 64
#define KMESH_TYPE_LEN 64
#define KMESH_HOST_LEN 128
#define KMESH_FILTER_CHAINS_LEN 64
#define KMESH_HTTP_DOMAIN_NUM 32
#define KMESH_HTTP_DOMAIN_LEN 128
#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN
#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER
#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST
#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE
#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT
#define KMESH_PER_HEADER_MUM 32
#define KMESH_PER_WEIGHT_CLUSTER_NUM 32
#define KMESH_PER_HASH_POLICY_NUM 1
#define KMESH_PER_HASH_POLICY_MSG_HEADER_LEN 10
#endif // _CONFIG_H_
7 changes: 7 additions & 0 deletions bpf/kmesh/ads/include/kmesh_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ static inline char *bpf_strncpy(char *dst, int n, const char *src)
}
#endif

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(key_size, sizeof(int));
__uint(value_size, sizeof(__u32));
__uint(max_entries, 1);
} map_of_lb_hash SEC(".maps");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a describe comment to this map.

typedef enum {
KMESH_TAIL_CALL_LISTENER = 1,
KMESH_TAIL_CALL_FILTER_CHAIN,
Expand Down
84 changes: 84 additions & 0 deletions bpf/kmesh/ads/include/route_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,89 @@ static inline char *route_get_cluster(const Route__Route *route)
return kmesh_get_ptr_val(_(route_act->cluster));
}

static inline __u32 lb_hash_based_header(unsigned char *header_name)
{
__u32 hash = 0;
struct bpf_mem_ptr *msg_header = NULL;
char *msg_header_v = NULL;
__u32 msg_header_len = 0;
__u32 k;
__u32 c;
char msg_header_cp[KMESH_PER_HASH_POLICY_MSG_HEADER_LEN] = {'\0'};

if (!header_name)
return hash;
// when header name is not null, compute a hash value.
BPF_LOG(INFO, ROUTER_CONFIG, "Got a header name:%s\n", header_name);
msg_header = (struct bpf_mem_ptr *)bpf_get_msg_header_element(header_name);
if (!msg_header)
return hash;

msg_header_v = _(msg_header->ptr);
msg_header_len = _(msg_header->size);
if (!msg_header_len)
return hash;

BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value len:%u\n", msg_header_len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove or change to debug level

if (!bpf_strncpy(msg_header_cp, msg_header_len, msg_header_v))
return hash;
BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value:%s\n", msg_header_cp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

hash = 5318;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5318 mean ?

#pragma unroll
for (k = 0; k < KMESH_PER_HASH_POLICY_MSG_HEADER_LEN; k++) {
if (!msg_header_v || k >= msg_header_len)
break;
c = *((unsigned char *)msg_header_cp + k);
hash = ((hash << 5) + hash) + c;
}

return hash;
}

static inline int map_add_lb_hash(__u32 hash)
{
int location = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that key conflicts exist. different sockets use the same key, and records may be overwritten.

return kmesh_map_update_elem(&map_of_lb_hash, &location, &hash);
}

static void consistent_hash_policy(const Route__Route *route)
{
Route__RouteAction *route_act = NULL;
void *hash_policy_ptr = NULL;
__u32 i;
Route__RouteAction__HashPolicy *hash_policy;
char *header_name = NULL;
Route__RouteAction__HashPolicy__Header *header;
uint32_t lb_hash = 0;

route_act = kmesh_get_ptr_val(_(route->route));
if (route_act) {
hash_policy_ptr = kmesh_get_ptr_val(_(route_act->hash_policy));
}

#pragma unroll
for (i = 0; i < KMESH_PER_HASH_POLICY_NUM; i++) {
if (hash_policy_ptr == NULL) {
break;
}
hash_policy = (Route__RouteAction__HashPolicy *)kmesh_get_ptr_val((void *)*((__u64 *)hash_policy_ptr + i));
if (!hash_policy)
break;
if (hash_policy->policy_specifier_case == ROUTE__ROUTE_ACTION__HASH_POLICY__POLICY_SPECIFIER_HEADER) {
header = kmesh_get_ptr_val(_(hash_policy->header));
if (!header)
continue;
header_name = kmesh_get_ptr_val(_(header->header_name));
break;
}
}
lb_hash = lb_hash_based_header(header_name);
if (map_add_lb_hash(lb_hash) != 0) {
BPF_LOG(ERR, ROUTER_CONFIG, "failed to update cluster lb hash value\n");
}
BPF_LOG(INFO, ROUTER_CONFIG, "Got a lb hash:%u\n", lb_hash);
}

SEC_TAIL(KMESH_PORG_CALLS, KMESH_TAIL_CALL_ROUTER_CONFIG)
int route_config_manager(ctx_buff_t *ctx)
{
Expand Down Expand Up @@ -373,6 +456,7 @@ int route_config_manager(ctx_buff_t *ctx)
}

cluster = route_get_cluster(route);
consistent_hash_policy(route);
if (!cluster) {
BPF_LOG(ERR, ROUTER_CONFIG, "failed to get cluster\n");
return KMESH_TAIL_CALL_RET(-1);
Expand Down
Loading
Loading