-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kmesh ads mode cluster add consistent hash lb #888
base: main
Are you sure you want to change the base?
Changes from all commits
e4890d8
ee6b8d7
228c285
49d827b
ff0646b
ad8047c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ | |
#include "endpoint/endpoint.pb-c.h" | ||
|
||
#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN | ||
#define MAGLEV_TABLE_SIZE 16381 | ||
#define HASH_INIT4_SEED 0xcafe | ||
|
||
struct { | ||
__uint(type, BPF_MAP_TYPE_HASH); | ||
|
@@ -20,6 +22,22 @@ struct { | |
__uint(max_entries, MAP_SIZE_OF_CLUSTER); | ||
} map_of_cluster SEC(".maps"); | ||
|
||
struct inner_of_maglev { | ||
__uint(type, BPF_MAP_TYPE_ARRAY); | ||
__uint(max_entries, 1); | ||
__uint(key_size, sizeof(__u32)); | ||
__uint(value_size, sizeof(__u32) * MAGLEV_TABLE_SIZE); | ||
}; | ||
|
||
struct { | ||
__uint(type, BPF_MAP_TYPE_HASH_OF_MAPS); | ||
__uint(key_size, CLUSTER_NAME_MAX_LEN); | ||
__uint(value_size, sizeof(__u32)); | ||
__uint(max_entries, MAP_SIZE_OF_CLUSTER); | ||
__uint(map_flags, BPF_F_NO_PREALLOC); | ||
__array(values, struct inner_of_maglev); | ||
} outer_of_maglev SEC(".maps"); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to use the outer-inner map structure here? |
||
struct cluster_endpoints { | ||
__u32 ep_num; | ||
/* */ | ||
|
@@ -63,6 +81,16 @@ static inline struct cluster_endpoints *map_lookup_cluster_eps(const char *clust | |
return kmesh_map_lookup_elem(&map_of_cluster_eps, cluster_name); | ||
} | ||
|
||
static inline __u32 *map_lookup_cluster_inner_map(const char *cluster_name) | ||
{ | ||
return kmesh_map_lookup_elem(&outer_of_maglev, cluster_name); | ||
} | ||
|
||
static inline __u32 *map_lookup_lb_hash() { | ||
int location = 0; | ||
return kmesh_map_lookup_elem(&map_of_lb_hash,&location); | ||
} | ||
|
||
static inline int map_add_cluster_eps(const char *cluster_name, const struct cluster_endpoints *eps) | ||
{ | ||
return kmesh_map_update_elem(&map_of_cluster_eps, cluster_name, eps); | ||
|
@@ -219,14 +247,95 @@ static inline void *loadbalance_round_robin(struct cluster_endpoints *eps) | |
return (void *)eps->ep_identity[idx]; | ||
} | ||
|
||
static inline void *cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy) | ||
/* The daddr is explicitly excluded from the hash here in order to allow for | ||
* backend selection to choose the same backend even on different service VIPs. | ||
*/ | ||
// static __always_inline __u32 hash_from_tuple_v4(struct bpf_sock * sk) | ||
// { | ||
|
||
// BPF_LOG(INFO, CLUSTER, "sk: src_ip is:%x, src_port is:%d\n", sk->src_ip4,sk->src_port); | ||
// BPF_LOG(INFO, CLUSTER, "sk: dst_port is:%d, sk->protocol is: %x\n",sk->dst_port,sk->protocol); | ||
// return jhash_3words(sk->src_ip4, | ||
// ((__u32) sk->dst_port << 16) | sk->src_port, | ||
// sk->protocol, HASH_INIT4_SEED); | ||
// } | ||
|
||
static __always_inline __u32 map_array_get_32(__u32 *array, __u32 index, const __u32 limit) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a source ref. |
||
{ | ||
__u32 datum = 0; | ||
|
||
// if (__builtin_constant_p(index) || | ||
// !__builtin_constant_p(limit)) | ||
// __throw_build_bug(); | ||
|
||
/* LLVM tends to optimize code away that is needed for the verifier to | ||
* understand dynamic map access. Input constraint is that index < limit | ||
* for this util function, so we never fail here, and returned datum is | ||
* always valid. | ||
*/ | ||
asm volatile("%[index] <<= 2\n\t" | ||
"if %[index] > %[limit] goto +1\n\t" | ||
"%[array] += %[index]\n\t" | ||
"%[datum] = *(u32 *)(%[array] + 0)\n\t" | ||
: [datum] "=r"(datum) | ||
: [limit] "i"(limit), [array] "r"(array), [index] "r"(index) | ||
: /* no clobbers */); | ||
|
||
return datum; | ||
} | ||
|
||
static inline void *loadbalance_maglev_select_backend(struct cluster_endpoints *eps, char *name, ctx_buff_t *ctx) | ||
{ | ||
if (!eps || eps->ep_num == 0) | ||
return NULL; | ||
|
||
__u32 inner_key = 0; | ||
__u32 *backend_ids; | ||
__u32 index; | ||
__u32 id; | ||
__u32 *inner_of_maglev; | ||
struct bpf_sock_ops *skops; | ||
__u32 *hash_ptr; | ||
__u32 hash; | ||
|
||
inner_of_maglev = map_lookup_cluster_inner_map(name); | ||
if (!inner_of_maglev) { | ||
return NULL; | ||
} | ||
backend_ids = bpf_map_lookup_elem(inner_of_maglev, &inner_key); | ||
if (!backend_ids) { | ||
return NULL; | ||
} | ||
hash_ptr = map_lookup_lb_hash(); | ||
if (!hash_ptr || *hash_ptr == 0) { | ||
hash = bpf_get_prandom_u32(); | ||
}else { | ||
hash = *hash_ptr; | ||
BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, got a hash value:%u\n", hash); | ||
} | ||
index = hash % MAGLEV_TABLE_SIZE; | ||
if (index >= MAGLEV_TABLE_SIZE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will |
||
return NULL; | ||
id = map_array_get_32(backend_ids, index, MAGLEV_TABLE_SIZE); | ||
BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, select backend id:%u\n", id); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I think this log is unnecessary. |
||
if (id >= KMESH_PER_ENDPOINT_NUM) | ||
return NULL; | ||
|
||
return (void *)eps->ep_identity[id]; | ||
} | ||
|
||
static inline void * | ||
cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy, char *name, ctx_buff_t *ctx) | ||
{ | ||
void *ep_identity = NULL; | ||
|
||
switch (lb_policy) { | ||
case CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN: | ||
ep_identity = loadbalance_round_robin(eps); | ||
break; | ||
case CLUSTER__CLUSTER__LB_POLICY__MAGLEV: | ||
ep_identity = loadbalance_maglev_select_backend(eps, name, ctx); | ||
break; | ||
default: | ||
BPF_LOG(INFO, CLUSTER, "%d lb_policy is unsupported, default:ROUND_ROBIN\n", lb_policy); | ||
ep_identity = loadbalance_round_robin(eps); | ||
|
@@ -254,7 +363,8 @@ static inline Core__SocketAddress *cluster_get_ep_sock_addr(const void *ep_ident | |
return sock_addr; | ||
} | ||
|
||
static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx) | ||
static inline int | ||
cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx, const char *cluster_name) | ||
{ | ||
char *name = NULL; | ||
void *ep_identity = NULL; | ||
|
@@ -273,7 +383,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_ | |
return -EAGAIN; | ||
} | ||
|
||
ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy); | ||
ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy, cluster_name, ctx); | ||
if (!ep_identity) { | ||
BPF_LOG(ERR, CLUSTER, "cluster=\"%s\" handle lb failed\n", name); | ||
return -EAGAIN; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,6 +69,13 @@ static inline char *bpf_strncpy(char *dst, int n, const char *src) | |
} | ||
#endif | ||
|
||
struct { | ||
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); | ||
__uint(key_size, sizeof(int)); | ||
__uint(value_size, sizeof(__u32)); | ||
__uint(max_entries, 1); | ||
} map_of_lb_hash SEC(".maps"); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a describe comment to this map. |
||
typedef enum { | ||
KMESH_TAIL_CALL_LISTENER = 1, | ||
KMESH_TAIL_CALL_FILTER_CHAIN, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -334,6 +334,89 @@ static inline char *route_get_cluster(const Route__Route *route) | |
return kmesh_get_ptr_val(_(route_act->cluster)); | ||
} | ||
|
||
static inline __u32 lb_hash_based_header(unsigned char *header_name) | ||
{ | ||
__u32 hash = 0; | ||
struct bpf_mem_ptr *msg_header = NULL; | ||
char *msg_header_v = NULL; | ||
__u32 msg_header_len = 0; | ||
__u32 k; | ||
__u32 c; | ||
char msg_header_cp[KMESH_PER_HASH_POLICY_MSG_HEADER_LEN] = {'\0'}; | ||
|
||
if (!header_name) | ||
return hash; | ||
// when header name is not null, compute a hash value. | ||
BPF_LOG(INFO, ROUTER_CONFIG, "Got a header name:%s\n", header_name); | ||
msg_header = (struct bpf_mem_ptr *)bpf_get_msg_header_element(header_name); | ||
if (!msg_header) | ||
return hash; | ||
|
||
msg_header_v = _(msg_header->ptr); | ||
msg_header_len = _(msg_header->size); | ||
if (!msg_header_len) | ||
return hash; | ||
|
||
BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value len:%u\n", msg_header_len); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove or change to debug level |
||
if (!bpf_strncpy(msg_header_cp, msg_header_len, msg_header_v)) | ||
return hash; | ||
BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value:%s\n", msg_header_cp); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
hash = 5318; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 5318 mean ? |
||
#pragma unroll | ||
for (k = 0; k < KMESH_PER_HASH_POLICY_MSG_HEADER_LEN; k++) { | ||
if (!msg_header_v || k >= msg_header_len) | ||
break; | ||
c = *((unsigned char *)msg_header_cp + k); | ||
hash = ((hash << 5) + hash) + c; | ||
} | ||
|
||
return hash; | ||
} | ||
|
||
static inline int map_add_lb_hash(__u32 hash) | ||
{ | ||
int location = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that key conflicts exist. different sockets use the same key, and records may be overwritten. |
||
return kmesh_map_update_elem(&map_of_lb_hash, &location, &hash); | ||
} | ||
|
||
static void consistent_hash_policy(const Route__Route *route) | ||
{ | ||
Route__RouteAction *route_act = NULL; | ||
void *hash_policy_ptr = NULL; | ||
__u32 i; | ||
Route__RouteAction__HashPolicy *hash_policy; | ||
char *header_name = NULL; | ||
Route__RouteAction__HashPolicy__Header *header; | ||
uint32_t lb_hash = 0; | ||
|
||
route_act = kmesh_get_ptr_val(_(route->route)); | ||
if (route_act) { | ||
hash_policy_ptr = kmesh_get_ptr_val(_(route_act->hash_policy)); | ||
} | ||
|
||
#pragma unroll | ||
for (i = 0; i < KMESH_PER_HASH_POLICY_NUM; i++) { | ||
if (hash_policy_ptr == NULL) { | ||
break; | ||
} | ||
hash_policy = (Route__RouteAction__HashPolicy *)kmesh_get_ptr_val((void *)*((__u64 *)hash_policy_ptr + i)); | ||
if (!hash_policy) | ||
break; | ||
if (hash_policy->policy_specifier_case == ROUTE__ROUTE_ACTION__HASH_POLICY__POLICY_SPECIFIER_HEADER) { | ||
header = kmesh_get_ptr_val(_(hash_policy->header)); | ||
if (!header) | ||
continue; | ||
header_name = kmesh_get_ptr_val(_(header->header_name)); | ||
break; | ||
} | ||
} | ||
lb_hash = lb_hash_based_header(header_name); | ||
if (map_add_lb_hash(lb_hash) != 0) { | ||
BPF_LOG(ERR, ROUTER_CONFIG, "failed to update cluster lb hash value\n"); | ||
} | ||
BPF_LOG(INFO, ROUTER_CONFIG, "Got a lb hash:%u\n", lb_hash); | ||
} | ||
|
||
SEC_TAIL(KMESH_PORG_CALLS, KMESH_TAIL_CALL_ROUTER_CONFIG) | ||
int route_config_manager(ctx_buff_t *ctx) | ||
{ | ||
|
@@ -373,6 +456,7 @@ int route_config_manager(ctx_buff_t *ctx) | |
} | ||
|
||
cluster = route_get_cluster(route); | ||
consistent_hash_policy(route); | ||
if (!cluster) { | ||
BPF_LOG(ERR, ROUTER_CONFIG, "failed to get cluster\n"); | ||
return KMESH_TAIL_CALL_RET(-1); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is table size set to this?