Skip to content

Commit

Permalink
ads mode support consistent hash
Browse files Browse the repository at this point in the history
Signed-off-by: kangmingfa <[email protected]>
  • Loading branch information
bfforever committed Sep 23, 2024
1 parent 6702e6a commit c82edc5
Show file tree
Hide file tree
Showing 12 changed files with 857 additions and 16 deletions.
1 change: 1 addition & 0 deletions api/cluster/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message Cluster {
ROUND_ROBIN = 0;
LEAST_REQUEST = 1;
RANDOM = 3;
MAGLEV = 5;
}

core.ApiStatus api_status = 128;
Expand Down
16 changes: 16 additions & 0 deletions api/route/route_components.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,26 @@ message RouteAction {
// assigned to each cluster.
WeightedCluster weighted_clusters = 3;
}

message HashPolicy {
message Header {
// The name of the request header that will be used to obtain the hash
// key. If the request header is not present, no hash will be produced.
string header_name = 1;
}

oneof policy_specifier {

// Header hash policy.
Header header = 1;
}
}

// the matched prefix (or path) should be swapped with this value.
string prefix_rewrite = 5;
uint32 timeout = 8;
RetryPolicy retry_policy = 9;
repeated HashPolicy hash_policy = 15;
}

message RetryPolicy {
Expand Down
116 changes: 113 additions & 3 deletions bpf/kmesh/ads/include/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "endpoint/endpoint.pb-c.h"

#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN
#define MAGLEV_TABLE_SIZE 16381
#define HASH_INIT4_SEED 0xcafe

struct {
__uint(type, BPF_MAP_TYPE_HASH);
Expand All @@ -20,6 +22,22 @@ struct {
__uint(max_entries, MAP_SIZE_OF_CLUSTER);
} map_of_cluster SEC(".maps");

struct inner_of_maglev {
__uint(type, BPF_MAP_TYPE_ARRAY);
__uint(max_entries, 1);
__uint(key_size, sizeof(__u32));
__uint(value_size, sizeof(__u32) * MAGLEV_TABLE_SIZE);
};

struct {
__uint(type, BPF_MAP_TYPE_HASH_OF_MAPS);
__uint(key_size, CLUSTER_NAME_MAX_LEN);
__uint(value_size, sizeof(__u32));
__uint(max_entries, MAP_SIZE_OF_CLUSTER);
__uint(map_flags, BPF_F_NO_PREALLOC);
__array(values, struct inner_of_maglev);
} outer_of_maglev SEC(".maps");

struct cluster_endpoints {
__u32 ep_num;
/* */
Expand Down Expand Up @@ -63,6 +81,16 @@ static inline struct cluster_endpoints *map_lookup_cluster_eps(const char *clust
return kmesh_map_lookup_elem(&map_of_cluster_eps, cluster_name);
}

static inline __u32 *map_lookup_cluster_inner_map(const char *cluster_name)
{
return kmesh_map_lookup_elem(&outer_of_maglev, cluster_name);
}

static inline __u32 *map_lookup_lb_hash() {
int location = 0;
return kmesh_map_lookup_elem(&map_of_lb_hash,&location);
}

static inline int map_add_cluster_eps(const char *cluster_name, const struct cluster_endpoints *eps)
{
return kmesh_map_update_elem(&map_of_cluster_eps, cluster_name, eps);
Expand Down Expand Up @@ -219,14 +247,95 @@ static inline void *loadbalance_round_robin(struct cluster_endpoints *eps)
return (void *)eps->ep_identity[idx];
}

static inline void *cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy)
/* The daddr is explicitly excluded from the hash here in order to allow for
* backend selection to choose the same backend even on different service VIPs.
*/
// static __always_inline __u32 hash_from_tuple_v4(struct bpf_sock * sk)
// {

// BPF_LOG(INFO, CLUSTER, "sk: src_ip is:%x, src_port is:%d\n", sk->src_ip4,sk->src_port);
// BPF_LOG(INFO, CLUSTER, "sk: dst_port is:%d, sk->protocol is: %x\n",sk->dst_port,sk->protocol);
// return jhash_3words(sk->src_ip4,
// ((__u32) sk->dst_port << 16) | sk->src_port,
// sk->protocol, HASH_INIT4_SEED);
// }

static __always_inline __u32 map_array_get_32(__u32 *array, __u32 index, const __u32 limit)
{
__u32 datum = 0;

// if (__builtin_constant_p(index) ||
// !__builtin_constant_p(limit))
// __throw_build_bug();

/* LLVM tends to optimize code away that is needed for the verifier to
* understand dynamic map access. Input constraint is that index < limit
* for this util function, so we never fail here, and returned datum is
* always valid.
*/
asm volatile("%[index] <<= 2\n\t"
"if %[index] > %[limit] goto +1\n\t"
"%[array] += %[index]\n\t"
"%[datum] = *(u32 *)(%[array] + 0)\n\t"
: [datum] "=r"(datum)
: [limit] "i"(limit), [array] "r"(array), [index] "r"(index)
: /* no clobbers */);

return datum;
}

static inline void *loadbalance_maglev_select_backend(struct cluster_endpoints *eps, char *name, ctx_buff_t *ctx)
{
if (!eps || eps->ep_num == 0)
return NULL;

__u32 inner_key = 0;
__u32 *backend_ids;
__u32 index;
__u32 id;
__u32 *inner_of_maglev;
struct bpf_sock_ops *skops;
__u32 *hash_ptr;
__u32 hash;

inner_of_maglev = map_lookup_cluster_inner_map(name);
if (!inner_of_maglev) {
return NULL;
}
backend_ids = bpf_map_lookup_elem(inner_of_maglev, &inner_key);
if (!backend_ids) {
return NULL;
}
hash_ptr = map_lookup_lb_hash();
if (!hash_ptr || *hash_ptr == 0) {
hash = bpf_get_prandom_u32();
}else {
hash = *hash_ptr;
BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, got a hash value:%u\n", hash);
}
index = hash % MAGLEV_TABLE_SIZE;
if (index >= MAGLEV_TABLE_SIZE)
return NULL;
id = map_array_get_32(backend_ids, index, MAGLEV_TABLE_SIZE);
BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, select backend id:%u\n", id);
if (id >= KMESH_PER_ENDPOINT_NUM)
return NULL;

return (void *)eps->ep_identity[id];
}

static inline void *
cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy, char *name, ctx_buff_t *ctx)
{
void *ep_identity = NULL;

switch (lb_policy) {
case CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN:
ep_identity = loadbalance_round_robin(eps);
break;
case CLUSTER__CLUSTER__LB_POLICY__MAGLEV:
ep_identity = loadbalance_maglev_select_backend(eps, name, ctx);
break;
default:
BPF_LOG(INFO, CLUSTER, "%d lb_policy is unsupported, default:ROUND_ROBIN\n", lb_policy);
ep_identity = loadbalance_round_robin(eps);
Expand Down Expand Up @@ -254,7 +363,8 @@ static inline Core__SocketAddress *cluster_get_ep_sock_addr(const void *ep_ident
return sock_addr;
}

static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx)
static inline int
cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx, const char *cluster_name)
{
char *name = NULL;
void *ep_identity = NULL;
Expand All @@ -273,7 +383,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_
return -EAGAIN;
}

ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy);
ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy, cluster_name, ctx);
if (!ep_identity) {
BPF_LOG(ERR, CLUSTER, "cluster=\"%s\" handle lb failed\n", name);
return -EAGAIN;
Expand Down
28 changes: 15 additions & 13 deletions bpf/kmesh/ads/include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,19 @@

// ************
// array len
#define KMESH_NAME_LEN 64
#define KMESH_TYPE_LEN 64
#define KMESH_HOST_LEN 128
#define KMESH_FILTER_CHAINS_LEN 64
#define KMESH_HTTP_DOMAIN_NUM 32
#define KMESH_HTTP_DOMAIN_LEN 128
#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN
#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER
#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST
#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE
#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT
#define KMESH_PER_HEADER_MUM 32
#define KMESH_PER_WEIGHT_CLUSTER_NUM 32
#define KMESH_NAME_LEN 64
#define KMESH_TYPE_LEN 64
#define KMESH_HOST_LEN 128
#define KMESH_FILTER_CHAINS_LEN 64
#define KMESH_HTTP_DOMAIN_NUM 32
#define KMESH_HTTP_DOMAIN_LEN 128
#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN
#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER
#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST
#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE
#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT
#define KMESH_PER_HEADER_MUM 32
#define KMESH_PER_WEIGHT_CLUSTER_NUM 32
#define KMESH_PER_HASH_POLICY_NUM 1
#define KMESH_PER_HASH_POLICY_MSG_HEADER_LEN 10
#endif // _CONFIG_H_
7 changes: 7 additions & 0 deletions bpf/kmesh/ads/include/kmesh_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ struct {
__uint(map_flags, 0);
} inner_map SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(key_size, sizeof(int));
__uint(value_size, sizeof(__u32));
__uint(max_entries, 1);
} map_of_lb_hash SEC(".maps");

typedef enum {
KMESH_TAIL_CALL_LISTENER = 1,
KMESH_TAIL_CALL_FILTER_CHAIN,
Expand Down
84 changes: 84 additions & 0 deletions bpf/kmesh/ads/include/route_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,89 @@ static inline char *route_get_cluster(const Route__Route *route)
return kmesh_get_ptr_val(_(route_act->cluster));
}

static inline __u32 lb_hash_based_header(unsigned char *header_name)
{
__u32 hash = 0;
struct bpf_mem_ptr *msg_header = NULL;
char *msg_header_v = NULL;
__u32 msg_header_len = 0;
__u32 k;
__u32 c;
char msg_header_cp[KMESH_PER_HASH_POLICY_MSG_HEADER_LEN] = {'\0'};

if (!header_name)
return hash;
// when header name is not null, compute a hash value.
BPF_LOG(INFO, ROUTER_CONFIG, "Got a header name:%s\n", header_name);
msg_header = (struct bpf_mem_ptr *)bpf_get_msg_header_element(header_name);
if (!msg_header)
return hash;

msg_header_v = _(msg_header->ptr);
msg_header_len = _(msg_header->size);
if (!msg_header_len)
return hash;

BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value len:%u\n", msg_header_len);
if (!bpf_strncpy(msg_header_cp, msg_header_len, msg_header_v))
return hash;
BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value:%s\n", msg_header_cp);
hash = 5318;
#pragma unroll
for (k = 0; k < KMESH_PER_HASH_POLICY_MSG_HEADER_LEN; k++) {
if (!msg_header_v || k >= msg_header_len)
break;
c = *((unsigned char *)msg_header_cp + k);
hash = ((hash << 5) + hash) + c;
}

return hash;
}

static inline int map_add_lb_hash(__u32 hash)
{
int location = 0;
return kmesh_map_update_elem(&map_of_lb_hash, &location, &hash);
}

static void consistent_hash_policy(const Route__Route *route)
{
Route__RouteAction *route_act = NULL;
void *hash_policy_ptr = NULL;
__u32 i;
Route__RouteAction__HashPolicy *hash_policy;
char *header_name = NULL;
Route__RouteAction__HashPolicy__Header *header;
uint32_t lb_hash = 0;

route_act = kmesh_get_ptr_val(_(route->route));
if (route_act) {
hash_policy_ptr = kmesh_get_ptr_val(_(route_act->hash_policy));
}

#pragma unroll
for (i = 0; i < KMESH_PER_HASH_POLICY_NUM; i++) {
if (hash_policy_ptr == NULL) {
break;
}
hash_policy = (Route__RouteAction__HashPolicy *)kmesh_get_ptr_val((void *)*((__u64 *)hash_policy_ptr + i));
if (!hash_policy)
break;
if (hash_policy->policy_specifier_case == ROUTE__ROUTE_ACTION__HASH_POLICY__POLICY_SPECIFIER_HEADER) {
header = kmesh_get_ptr_val(_(hash_policy->header));
if (!header)
continue;
header_name = kmesh_get_ptr_val(_(header->header_name));
break;
}
}
lb_hash = lb_hash_based_header(header_name);
if (map_add_lb_hash(lb_hash) != 0) {
BPF_LOG(ERR, ROUTER_CONFIG, "failed to update cluster lb hash value\n");
}
BPF_LOG(INFO, ROUTER_CONFIG, "Got a lb hash:%u\n", lb_hash);
}

SEC_TAIL(KMESH_PORG_CALLS, KMESH_TAIL_CALL_ROUTER_CONFIG)
int route_config_manager(ctx_buff_t *ctx)
{
Expand Down Expand Up @@ -373,6 +456,7 @@ int route_config_manager(ctx_buff_t *ctx)
}

cluster = route_get_cluster(route);
consistent_hash_policy(route);
if (!cluster) {
BPF_LOG(ERR, ROUTER_CONFIG, "failed to get cluster\n");
return KMESH_TAIL_CALL_RET(-1);
Expand Down
7 changes: 7 additions & 0 deletions pkg/bpf/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"kmesh.net/kmesh/daemon/options"
"kmesh.net/kmesh/pkg/logger"
"kmesh.net/kmesh/pkg/version"
"kmesh.net/kmesh/pkg/consistenthash/maglev"
)

var (
Expand Down Expand Up @@ -97,6 +98,12 @@ func (l *BpfLoader) StartAdsMode() (err error) {
l.Stop()
return fmt.Errorf("deserial_init failed:%v", ret)
}

if err = maglev.InitMaglevMap(); err != nil {
l.Stop()
return fmt.Errorf("lb maglev init failed, %s", err)
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/cache/v2/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cluster_v2 "kmesh.net/kmesh/api/v2/cluster"
core_v2 "kmesh.net/kmesh/api/v2/core"
maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps"
"kmesh.net/kmesh/pkg/consistenthash/maglev"
)

type ClusterCache struct {
Expand Down Expand Up @@ -129,6 +130,12 @@ func (cache *ClusterCache) Flush() {
for name, cluster := range cache.apiClusterCache {
if cluster.GetApiStatus() == core_v2.ApiStatus_UPDATE {
err := maps_v2.ClusterUpdate(name, cluster)
if cluster.GetLbPolicy() == cluster_v2.Cluster_MAGLEV {
// create consistent lb here
if err := maglev.CreateLB(cluster);err != nil {
log.Errorf("maglev lb update %v cluster failed: %v",name, err)
}
}
if err == nil {
// reset api status after successfully updated
cluster.ApiStatus = core_v2.ApiStatus_NONE
Expand Down
Loading

0 comments on commit c82edc5

Please sign in to comment.