diff --git a/api/cluster/cluster.proto b/api/cluster/cluster.proto index bf5048c45..f75e24b10 100644 --- a/api/cluster/cluster.proto +++ b/api/cluster/cluster.proto @@ -12,6 +12,7 @@ message Cluster { ROUND_ROBIN = 0; LEAST_REQUEST = 1; RANDOM = 3; + MAGLEV = 5; } core.ApiStatus api_status = 128; diff --git a/api/route/route_components.proto b/api/route/route_components.proto index 1ae076f8a..6cfe3b250 100644 --- a/api/route/route_components.proto +++ b/api/route/route_components.proto @@ -30,10 +30,26 @@ message RouteAction { // assigned to each cluster. WeightedCluster weighted_clusters = 3; } + + message HashPolicy { + message Header { + // The name of the request header that will be used to obtain the hash + // key. If the request header is not present, no hash will be produced. + string header_name = 1; + } + + oneof policy_specifier { + + // Header hash policy. + Header header = 1; + } + } + // the matched prefix (or path) should be swapped with this value. string prefix_rewrite = 5; uint32 timeout = 8; RetryPolicy retry_policy = 9; + repeated HashPolicy hash_policy = 15; } message RetryPolicy { diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h index e5fd4a358..6aa834a42 100644 --- a/bpf/kmesh/ads/include/cluster.h +++ b/bpf/kmesh/ads/include/cluster.h @@ -11,6 +11,8 @@ #include "endpoint/endpoint.pb-c.h" #define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN +#define MAGLEV_TABLE_SIZE 16381 +#define HASH_INIT4_SEED 0xcafe struct { __uint(type, BPF_MAP_TYPE_HASH); @@ -20,6 +22,22 @@ struct { __uint(max_entries, MAP_SIZE_OF_CLUSTER); } map_of_cluster SEC(".maps"); +struct inner_of_maglev { + __uint(type, BPF_MAP_TYPE_ARRAY); + __uint(max_entries, 1); + __uint(key_size, sizeof(__u32)); + __uint(value_size, sizeof(__u32) * MAGLEV_TABLE_SIZE); +}; + +struct { + __uint(type, BPF_MAP_TYPE_HASH_OF_MAPS); + __uint(key_size, CLUSTER_NAME_MAX_LEN); + __uint(value_size, sizeof(__u32)); + __uint(max_entries, MAP_SIZE_OF_CLUSTER); + __uint(map_flags, BPF_F_NO_PREALLOC); + __array(values, struct inner_of_maglev); +} outer_of_maglev SEC(".maps"); + struct cluster_endpoints { __u32 ep_num; /* */ @@ -63,6 +81,16 @@ static inline struct cluster_endpoints *map_lookup_cluster_eps(const char *clust return kmesh_map_lookup_elem(&map_of_cluster_eps, cluster_name); } +static inline __u32 *map_lookup_cluster_inner_map(const char *cluster_name) +{ + return kmesh_map_lookup_elem(&outer_of_maglev, cluster_name); +} + +static inline __u32 *map_lookup_lb_hash() { + int location = 0; + return kmesh_map_lookup_elem(&map_of_lb_hash,&location); +} + static inline int map_add_cluster_eps(const char *cluster_name, const struct cluster_endpoints *eps) { return kmesh_map_update_elem(&map_of_cluster_eps, cluster_name, eps); @@ -219,7 +247,85 @@ static inline void *loadbalance_round_robin(struct cluster_endpoints *eps) return (void *)eps->ep_identity[idx]; } -static inline void *cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy) +/* The daddr is explicitly excluded from the hash here in order to allow for + * backend selection to choose the same backend even on different service VIPs. + */ +// static __always_inline __u32 hash_from_tuple_v4(struct bpf_sock * sk) +// { + +// BPF_LOG(INFO, CLUSTER, "sk: src_ip is:%x, src_port is:%d\n", sk->src_ip4,sk->src_port); +// BPF_LOG(INFO, CLUSTER, "sk: dst_port is:%d, sk->protocol is: %x\n",sk->dst_port,sk->protocol); +// return jhash_3words(sk->src_ip4, +// ((__u32) sk->dst_port << 16) | sk->src_port, +// sk->protocol, HASH_INIT4_SEED); +// } + +static __always_inline __u32 map_array_get_32(__u32 *array, __u32 index, const __u32 limit) +{ + __u32 datum = 0; + + // if (__builtin_constant_p(index) || + // !__builtin_constant_p(limit)) + // __throw_build_bug(); + + /* LLVM tends to optimize code away that is needed for the verifier to + * understand dynamic map access. Input constraint is that index < limit + * for this util function, so we never fail here, and returned datum is + * always valid. + */ + asm volatile("%[index] <<= 2\n\t" + "if %[index] > %[limit] goto +1\n\t" + "%[array] += %[index]\n\t" + "%[datum] = *(u32 *)(%[array] + 0)\n\t" + : [datum] "=r"(datum) + : [limit] "i"(limit), [array] "r"(array), [index] "r"(index) + : /* no clobbers */); + + return datum; +} + +static inline void *loadbalance_maglev_select_backend(struct cluster_endpoints *eps, char *name, ctx_buff_t *ctx) +{ + if (!eps || eps->ep_num == 0) + return NULL; + + __u32 inner_key = 0; + __u32 *backend_ids; + __u32 index; + __u32 id; + __u32 *inner_of_maglev; + struct bpf_sock_ops *skops; + __u32 *hash_ptr; + __u32 hash; + + inner_of_maglev = map_lookup_cluster_inner_map(name); + if (!inner_of_maglev) { + return NULL; + } + backend_ids = bpf_map_lookup_elem(inner_of_maglev, &inner_key); + if (!backend_ids) { + return NULL; + } + hash_ptr = map_lookup_lb_hash(); + if (!hash_ptr || *hash_ptr == 0) { + hash = bpf_get_prandom_u32(); + }else { + hash = *hash_ptr; + BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, got a hash value:%u\n", hash); + } + index = hash % MAGLEV_TABLE_SIZE; + if (index >= MAGLEV_TABLE_SIZE) + return NULL; + id = map_array_get_32(backend_ids, index, MAGLEV_TABLE_SIZE); + BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, select backend id:%u\n", id); + if (id >= KMESH_PER_ENDPOINT_NUM) + return NULL; + + return (void *)eps->ep_identity[id]; +} + +static inline void * +cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy, char *name, ctx_buff_t *ctx) { void *ep_identity = NULL; @@ -227,6 +333,9 @@ static inline void *cluster_get_ep_identity_by_lb_policy(struct cluster_endpoint case CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN: ep_identity = loadbalance_round_robin(eps); break; + case CLUSTER__CLUSTER__LB_POLICY__MAGLEV: + ep_identity = loadbalance_maglev_select_backend(eps, name, ctx); + break; default: BPF_LOG(INFO, CLUSTER, "%d lb_policy is unsupported, default:ROUND_ROBIN\n", lb_policy); ep_identity = loadbalance_round_robin(eps); @@ -254,7 +363,8 @@ static inline Core__SocketAddress *cluster_get_ep_sock_addr(const void *ep_ident return sock_addr; } -static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx) +static inline int +cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx, const char *cluster_name) { char *name = NULL; void *ep_identity = NULL; @@ -273,7 +383,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_ return -EAGAIN; } - ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy); + ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy, cluster_name, ctx); if (!ep_identity) { BPF_LOG(ERR, CLUSTER, "cluster=\"%s\" handle lb failed\n", name); return -EAGAIN; diff --git a/bpf/kmesh/ads/include/config.h b/bpf/kmesh/ads/include/config.h index 714a06a19..a9d2a4913 100644 --- a/bpf/kmesh/ads/include/config.h +++ b/bpf/kmesh/ads/include/config.h @@ -54,17 +54,19 @@ // ************ // array len -#define KMESH_NAME_LEN 64 -#define KMESH_TYPE_LEN 64 -#define KMESH_HOST_LEN 128 -#define KMESH_FILTER_CHAINS_LEN 64 -#define KMESH_HTTP_DOMAIN_NUM 32 -#define KMESH_HTTP_DOMAIN_LEN 128 -#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN -#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER -#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST -#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE -#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT -#define KMESH_PER_HEADER_MUM 32 -#define KMESH_PER_WEIGHT_CLUSTER_NUM 32 +#define KMESH_NAME_LEN 64 +#define KMESH_TYPE_LEN 64 +#define KMESH_HOST_LEN 128 +#define KMESH_FILTER_CHAINS_LEN 64 +#define KMESH_HTTP_DOMAIN_NUM 32 +#define KMESH_HTTP_DOMAIN_LEN 128 +#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN +#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER +#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST +#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE +#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT +#define KMESH_PER_HEADER_MUM 32 +#define KMESH_PER_WEIGHT_CLUSTER_NUM 32 +#define KMESH_PER_HASH_POLICY_NUM 1 +#define KMESH_PER_HASH_POLICY_MSG_HEADER_LEN 10 #endif // _CONFIG_H_ diff --git a/bpf/kmesh/ads/include/kmesh_common.h b/bpf/kmesh/ads/include/kmesh_common.h index 2bb61339d..9f5750f16 100644 --- a/bpf/kmesh/ads/include/kmesh_common.h +++ b/bpf/kmesh/ads/include/kmesh_common.h @@ -69,6 +69,13 @@ static inline char *bpf_strncpy(char *dst, int n, const char *src) } #endif +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(key_size, sizeof(int)); + __uint(value_size, sizeof(__u32)); + __uint(max_entries, 1); +} map_of_lb_hash SEC(".maps"); + typedef enum { KMESH_TAIL_CALL_LISTENER = 1, KMESH_TAIL_CALL_FILTER_CHAIN, diff --git a/bpf/kmesh/ads/include/route_config.h b/bpf/kmesh/ads/include/route_config.h index a90c4a153..72883ad8d 100644 --- a/bpf/kmesh/ads/include/route_config.h +++ b/bpf/kmesh/ads/include/route_config.h @@ -334,6 +334,89 @@ static inline char *route_get_cluster(const Route__Route *route) return kmesh_get_ptr_val(_(route_act->cluster)); } +static inline __u32 lb_hash_based_header(unsigned char *header_name) +{ + __u32 hash = 0; + struct bpf_mem_ptr *msg_header = NULL; + char *msg_header_v = NULL; + __u32 msg_header_len = 0; + __u32 k; + __u32 c; + char msg_header_cp[KMESH_PER_HASH_POLICY_MSG_HEADER_LEN] = {'\0'}; + + if (!header_name) + return hash; + // when header name is not null, compute a hash value. + BPF_LOG(INFO, ROUTER_CONFIG, "Got a header name:%s\n", header_name); + msg_header = (struct bpf_mem_ptr *)bpf_get_msg_header_element(header_name); + if (!msg_header) + return hash; + + msg_header_v = _(msg_header->ptr); + msg_header_len = _(msg_header->size); + if (!msg_header_len) + return hash; + + BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value len:%u\n", msg_header_len); + if (!bpf_strncpy(msg_header_cp, msg_header_len, msg_header_v)) + return hash; + BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value:%s\n", msg_header_cp); + hash = 5318; +#pragma unroll + for (k = 0; k < KMESH_PER_HASH_POLICY_MSG_HEADER_LEN; k++) { + if (!msg_header_v || k >= msg_header_len) + break; + c = *((unsigned char *)msg_header_cp + k); + hash = ((hash << 5) + hash) + c; + } + + return hash; +} + +static inline int map_add_lb_hash(__u32 hash) +{ + int location = 0; + return kmesh_map_update_elem(&map_of_lb_hash, &location, &hash); +} + +static void consistent_hash_policy(const Route__Route *route) +{ + Route__RouteAction *route_act = NULL; + void *hash_policy_ptr = NULL; + __u32 i; + Route__RouteAction__HashPolicy *hash_policy; + char *header_name = NULL; + Route__RouteAction__HashPolicy__Header *header; + uint32_t lb_hash = 0; + + route_act = kmesh_get_ptr_val(_(route->route)); + if (route_act) { + hash_policy_ptr = kmesh_get_ptr_val(_(route_act->hash_policy)); + } + +#pragma unroll + for (i = 0; i < KMESH_PER_HASH_POLICY_NUM; i++) { + if (hash_policy_ptr == NULL) { + break; + } + hash_policy = (Route__RouteAction__HashPolicy *)kmesh_get_ptr_val((void *)*((__u64 *)hash_policy_ptr + i)); + if (!hash_policy) + break; + if (hash_policy->policy_specifier_case == ROUTE__ROUTE_ACTION__HASH_POLICY__POLICY_SPECIFIER_HEADER) { + header = kmesh_get_ptr_val(_(hash_policy->header)); + if (!header) + continue; + header_name = kmesh_get_ptr_val(_(header->header_name)); + break; + } + } + lb_hash = lb_hash_based_header(header_name); + if (map_add_lb_hash(lb_hash) != 0) { + BPF_LOG(ERR, ROUTER_CONFIG, "failed to update cluster lb hash value\n"); + } + BPF_LOG(INFO, ROUTER_CONFIG, "Got a lb hash:%u\n", lb_hash); +} + SEC_TAIL(KMESH_PORG_CALLS, KMESH_TAIL_CALL_ROUTER_CONFIG) int route_config_manager(ctx_buff_t *ctx) { @@ -373,6 +456,7 @@ int route_config_manager(ctx_buff_t *ctx) } cluster = route_get_cluster(route); + consistent_hash_policy(route); if (!cluster) { BPF_LOG(ERR, ROUTER_CONFIG, "failed to get cluster\n"); return KMESH_TAIL_CALL_RET(-1); diff --git a/docs/proposal/consistent_hash.md b/docs/proposal/consistent_hash.md new file mode 100644 index 000000000..2b7f3d51f --- /dev/null +++ b/docs/proposal/consistent_hash.md @@ -0,0 +1,171 @@ +--- +title: Consistent hash LB +authors: +- "@bfforever" # Authors' GitHub accounts here. +reviewers: +- "@supercharge-xsy" +- "@hzxuzhonghu" +- "@nlwcy" +- TBD +approvers: +- "@robot" +- TBD + +creation-date: 2024-05-29 + +--- + +## Your short, descriptive title + +Add Consistent hash based LB in ebpf prog + +### Summary + +
    +
  1. Maglev consistent hash algorithm
  2. +
  3. IP tuple info based hash
  4. +
  5. L7 http header based hash
  6. +
+ +#### Goals + +
    +
  1. Networking load balance
  2. +
  3. Guarrant a client conn always send request to same backend
  4. +
  5. Minimize remapping
  6. +
+ +### Proposal + +#### maglev +Algorithm main purpose: map all backends into a fixed look up table, require even mapping, and try to minimize mapping changes when backend change (remove or add a backend), avoid global remapping. + + +#### ebpf implement situation or point +
    +
  1. Cgroup level
  2. +In cgroup socket implement l4 and l7 header based consistent hash. +
  3. Container veth tc level
  4. +In tc level implement l4 based and l7 based consistent hash. +
+ + +### Design Details + +![consistent_hash](pics/kmesh_consistent_hash.svg) +#### maglev +main steps: + +
    +
  1. Map endpoints of Cluster to a fixed table
  2. +Based on Cluster config from istiod to generate a integer table, the value of table is endpoints index; + + +
  3. Base l4 or l7 info compute a hash value
  4. +use hash value get an table index, which is index=hash%len(table); +
  5. Use index to access table
  6. +ep=tables[index] +
+ + +##### maglev lookup table +Use a compact table, table entry is bit width of endpoint count. +For the table size default is 16381. + +#### L4 implement +
    +
  1. (saddr,daddr,sport,dport,protocol)-->hash
  2. +
  3. then execute above second step.
  4. +
+ +##### risk in l4 implement +In cgroup level, because of lacking saddr and sport, can't do l4 lb effectively. + +By the advice of @supercharge-xsy, add (net namespace cookie + daddr + dport ) to compute hash value. + + +#### L7 implement +``` +apiVersion: networking.istio.io/v1alpha3 +kind: DestinationRule +metadata: + name: helloworld +spec: + host: helloworld + trafficPolicy: + loadBalancer: + consistentHash: + httpHeaderName: testHeader + maglev: + tableSize: +``` +
    +
  1. specify which l7 header is selected to do hash
  2. +
  3. extract l7 header info from http header
  4. +
  5. compute a hash value base header value
  6. +
  7. then execute above second step
  8. +
+ +##### Risk in l7 header value to compute a hash +Because in ebpf prog, there is a lack of string-based hash compute. +
+Below is a simple string-based hash method.Probability we need an effective hash method. +``` +static inline __u32 lb_hash_based_header(unsigned char *header_name) { + __u32 hash = 0; + struct bpf_mem_ptr *msg_header = NULL; + char *msg_header_v = NULL; + __u32 msg_header_len = 0; + __u32 k; + __u32 c; + char msg_header_cp[KMESH_PER_HASH_POLICY_MSG_HEADER_LEN] = {'\0'}; + + if (!header_name) + return hash; + // when header name is not null, compute a hash value. + BPF_LOG(INFO, ROUTER_CONFIG, "Got a header name:%s\n", header_name); + msg_header = (struct bpf_mem_ptr *)bpf_get_msg_header_element(header_name); + if (!msg_header) + return hash; + + msg_header_v = _(msg_header->ptr); + msg_header_len = _(msg_header->size); + if (!msg_header_len) + return hash; + + BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value len:%u\n", msg_header_len); + if (!bpf_strncpy(msg_header_cp, msg_header_len, msg_header_v)) + return hash; + BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value:%s\n", msg_header_cp); + // Currently a simple hash method + hash = 5318; +# pragma unroll + for (k = 0; k < KMESH_PER_HASH_POLICY_MSG_HEADER_LEN;k++) { + if (!msg_header_v || k >= msg_header_len) + break; + c = *((unsigned char *)msg_header_cp + k); + hash = ((hash << 5) + hash) + c; + } + + return hash; +} +``` + + +#### Cgroup level +Directly through lb logic, to change the daddr and dport of socket. + +#### Tc level +Do lb algorithm based on network packet or message, to change the daddr and dport of packet. +
+This way can get full ip address info to do lb. + +#### Test Plan + +
    +
  1. Table mapping is even distribution
  2. +
  3. Add or remove a endpoint only affect few connections
  4. +
+ +#### Ring hash implement reference +https://github.com/envoyproxy/envoy/blob/b0cc284f86e328a33525d6d2b8f6ab00fff240b8/source/extensions/load_balancing_policies/ring_hash/ring_hash_lb.cc#L92 diff --git a/docs/proposal/pics/kmesh_consistent_hash.svg b/docs/proposal/pics/kmesh_consistent_hash.svg new file mode 100644 index 000000000..a40d4b250 --- /dev/null +++ b/docs/proposal/pics/kmesh_consistent_hash.svg @@ -0,0 +1 @@ +
out map
out map
inner map
inner map
cluster name(192B)
cluster nam...
inner map fd
inner ma...
ep_ids
ep_ids
0
0
0
0
1
1
2
2
1
1
0
0
2
2
1
1
table
table
ebpf prog lb logicĀ 
ebpf prog lb logicĀ 
hash % len(table)
hash % len(table)
istiod
istiod
Kmesh Daemon
Kmesh Daemon
0
0
2
2
endpoints
endpoi...
table generate by endpoints with maglev algorithm
table generate by endpoints with maglev algorithm
1
1
sock_addr
sock_ad...
sock_addr
sock_ad...
sock_addr
sock_ad...
Text is not SVG - cannot display
\ No newline at end of file diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 726296e05..81945c28e 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -36,6 +36,7 @@ import ( "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/logger" "kmesh.net/kmesh/pkg/version" + "kmesh.net/kmesh/pkg/consistenthash/maglev" ) var ( @@ -101,6 +102,12 @@ func (l *BpfLoader) StartAdsMode() (err error) { l.Stop() return fmt.Errorf("deserial_init failed:%v", ret) } + + if err = maglev.InitMaglevMap(); err != nil { + l.Stop() + return fmt.Errorf("lb maglev init failed, %s", err) + } + return nil } diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 357d5243e..75332f50c 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -27,6 +27,7 @@ import ( cluster_v2 "kmesh.net/kmesh/api/v2/cluster" core_v2 "kmesh.net/kmesh/api/v2/core" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" + "kmesh.net/kmesh/pkg/consistenthash/maglev" ) type ClusterCache struct { @@ -129,6 +130,12 @@ func (cache *ClusterCache) Flush() { for name, cluster := range cache.apiClusterCache { if cluster.GetApiStatus() == core_v2.ApiStatus_UPDATE { err := maps_v2.ClusterUpdate(name, cluster) + if cluster.GetLbPolicy() == cluster_v2.Cluster_MAGLEV { + // create consistent lb here + if err := maglev.CreateLB(cluster);err != nil { + log.Errorf("maglev lb update %v cluster failed: %v",name, err) + } + } if err == nil { // reset api status after successfully updated cluster.ApiStatus = core_v2.ApiStatus_NONE diff --git a/pkg/consistenthash/maglev/maglev.go b/pkg/consistenthash/maglev/maglev.go new file mode 100644 index 000000000..aa7739e07 --- /dev/null +++ b/pkg/consistenthash/maglev/maglev.go @@ -0,0 +1,204 @@ +package maglev + +import ( + "encoding/base64" + "errors" + "fmt" + "unsafe" + + "github.com/cilium/ebpf" + cluster_v2 "kmesh.net/kmesh/api/v2/cluster" + "kmesh.net/kmesh/api/v2/endpoint" + "kmesh.net/kmesh/pkg/logger" + "kmesh.net/kmesh/pkg/utils/hash" +) + +const ( + DefaultTableSize uint64 = 16381 + DefaultHashSeed = "JLfvgnHc2kaSUFaI" + MaglevOuterMapName = "outer_of_maglev" + MaglevInnerMapName = "inner_of_maglev" + MaglevMapMaxEntries = 65536 + ClusterNameMaxLen = 192 +) + +var ( + outer *ebpf.Map + seedMurmur uint32 + maglevTableSize uint64 +) + +var log = logger.NewLoggerField("lb/maglev") + +type Backend struct { + ep *endpoint.Endpoint + index int + offset uint64 + skip uint64 + next uint64 +} + +func InitMaglevMap() error { + log.Println("InitMaglevMap") + + maglevTableSize = DefaultTableSize + + opt := &ebpf.LoadPinOptions{} + outer_map, err := ebpf.LoadPinnedMap("/sys/fs/bpf"+"/bpf_kmesh/map/"+MaglevOuterMapName, opt) + if err != nil { + return fmt.Errorf("load outer map of maglev failed err: %v", err) + } + outer = outer_map + + d, err := base64.StdEncoding.DecodeString(DefaultHashSeed) + if err != nil { + return fmt.Errorf("cannot decode base64 Maglev hash seed %q: %w", DefaultHashSeed, err) + } + if len(d) != 12 { + return fmt.Errorf("decoded hash seed is %d bytes (not 12 bytes)", len(d)) + } + seedMurmur = uint32(d[0])<<24 | uint32(d[1])<<16 | uint32(d[2])<<8 | uint32(d[3]) + + return nil +} + +func CreateLB(cluster *cluster_v2.Cluster) error { + if cluster == nil { + return errors.New("cluster is nil") + } + + clusterName := cluster.GetName() + table, err := getLookupTable(cluster, maglevTableSize) + if err != nil { + return err + } + backendIDs := make([]uint32, maglevTableSize) + for i, id := range table { + backendIDs[i] = uint32(id) + } + + err = updateMaglevTable(backendIDs, clusterName) + if err != nil { + return fmt.Errorf("updateMaglevTable fail err:%v", err) + } + + return nil +} + +// createMaglevInnerMap creates a new Maglev inner map in the kernel +// using the given table size. +func createMaglevInnerMap(tableSize uint32) (*ebpf.Map, error) { + spec := &ebpf.MapSpec{ + Name: MaglevInnerMapName, + Type: ebpf.Array, + KeySize: uint32(unsafe.Sizeof(uint32(0))), + ValueSize: uint32(unsafe.Sizeof(uint32(0))) * tableSize, + MaxEntries: 1, + } + + m, err := ebpf.NewMap(spec) + if err != nil { + return nil, err + } + return m, nil +} + +func updateMaglevTable(backendIDs []uint32, clusterName string) error { + log.Debugf("updateMaglevTable... of cluster:%v", clusterName) + if outer == nil { + return errors.New("outer maglev maps not yet initialized") + } + inner, err := createMaglevInnerMap(uint32(maglevTableSize)) + if err != nil { + return err + } + defer inner.Close() + + var key uint32 = 0 + if err := inner.Update(key, backendIDs, 0); err != nil { + return fmt.Errorf("updating backends of cluster %v : %w", clusterName, err) + } + + if len(clusterName) > ClusterNameMaxLen { + clusterName = clusterName[:ClusterNameMaxLen] + } + var maglevKey [ClusterNameMaxLen]byte + copy(maglevKey[:], []byte(clusterName)) + + if err := outer.Update(maglevKey, uint32(inner.FD()), 0); err != nil { + return fmt.Errorf("updating cluster %v: %w", clusterName, err) + } + return nil +} + +func getOffsetAndSkip(address string, m uint64) (uint64, uint64) { + h1, h2 := hash.Hash128([]byte(address), seedMurmur) + offset := h1 % m + skip := (h2 % (m - 1)) + 1 + + return offset, skip +} + +func getPermutation(b Backend) uint64 { + return (b.offset + (b.skip * b.next)) % maglevTableSize +} + +func getLookupTable(cluster *cluster_v2.Cluster, tableSize uint64) ([]int, error) { + + loadAssignment := cluster.GetLoadAssignment() + clusterName := cluster.GetName() + localityLbEps := loadAssignment.GetEndpoints() + + if len(localityLbEps) == 0 { + return nil, fmt.Errorf("current cluster:%v has no any lb endpoints", clusterName) + } + + flatEps := make([]*endpoint.Endpoint, 0) + + //yet not consider weight + for _, localityLbEp := range localityLbEps { + eps := localityLbEp.GetLbEndpoints() + flatEps = append(flatEps, eps...) + } + backends := make([]Backend, 0, len(flatEps)) + + for i, ep := range flatEps { + epOffset, epSkip := getOffsetAndSkip(ep.GetAddress().String(), maglevTableSize) + b := Backend{ + ep: ep, + index: i, + offset: epOffset, + skip: epSkip, + next: 0, + } + backends = append(backends, b) + } + + if len(backends) == 0 { + return nil, fmt.Errorf("current cluster:%v has no any lb backends", clusterName) + } + + length := len(backends) + lookUpTable := make([]int, tableSize) + + for i := uint64(0); i < tableSize; i++ { + lookUpTable[i] = -1 + } + + for n := uint64(0); n < tableSize; n++ { + j := int(n) % length + b := backends[j] + for { + c := getPermutation(b) + for lookUpTable[c] >= 0 { + b.next++ + c = getPermutation(b) + } + lookUpTable[c] = b.index + b.next++ + break + } + } + + return lookUpTable, nil +} diff --git a/pkg/consistenthash/maglev/maglev_test.go b/pkg/consistenthash/maglev/maglev_test.go new file mode 100644 index 000000000..86b97d515 --- /dev/null +++ b/pkg/consistenthash/maglev/maglev_test.go @@ -0,0 +1,176 @@ +package maglev + +import ( + "fmt" + "os" + "testing" + "unsafe" + + "github.com/cilium/ebpf" + "github.com/stretchr/testify/suite" + cluster_v2 "kmesh.net/kmesh/api/v2/cluster" + "kmesh.net/kmesh/api/v2/core" + "kmesh.net/kmesh/api/v2/endpoint" +) + +func TestMaglevTestSuite(t *testing.T) { + suite.Run(t, new(MaglevTestSuite)) +} + +type MaglevTestSuite struct { + mapPath string + suite.Suite +} + +func (suite *MaglevTestSuite) SetupSuite() { + mapPath := "/sys/fs/bpf/bpf_kmesh/map/" + suite.mapPath = mapPath + _, err := os.Stat(mapPath) + if os.IsNotExist(err) { + err := os.MkdirAll(mapPath, 0755) + if err != nil { + fmt.Println("can not mkdir bpf map path", err) + } + } else if err != nil { + fmt.Println("other err:", err) + return + } else { + fmt.Println("bpf map path already exist ", mapPath) + } + dummyInnerMapSpec := newMaglevInnerMapSpecTest(uint32(DefaultTableSize)) + _, err = NewMaglevOuterMap(MaglevOuterMapName, MaglevMapMaxEntries, uint32(DefaultTableSize), dummyInnerMapSpec, mapPath) + if err != nil { + fmt.Printf("NewMaglevOuterMap err: %v\n", err) + } + InitMaglevMap() +} + +func (suite *MaglevTestSuite) TearDownSuite() { + + fmt.Println(">>> From TearDownSuite") +} + +func (suite *MaglevTestSuite) TestCreateLB() { + cluster := newCluster() + clusterName := cluster.GetName() + + err := CreateLB(cluster) + if err != nil { + fmt.Println(err) + } + + var inner_fd uint32 + var maglevKey [ClusterNameMaxLen]byte + + copy(maglevKey[:], []byte(clusterName)) + opt := &ebpf.LoadPinOptions{} + outer_map, err := ebpf.LoadPinnedMap(suite.mapPath+MaglevOuterMapName, opt) + if err != nil { + fmt.Printf("LoadPinnedMap err: %v \n", err) + } + err = outer_map.Lookup(maglevKey, &inner_fd) + if err != nil { + fmt.Printf("Lookup with key %v , err %v \n", clusterName, err) + } + fmt.Println("inner fd: ", inner_fd) + +} + +func (suite *MaglevTestSuite) TestGetLookupTable() { + cluster := newCluster() + + table, err := getLookupTable(cluster, DefaultTableSize) + if err != nil { + fmt.Printf("getLookupTable err:%v \n", err) + } + backendCount := make(map[int]int) + // print backend id distribute + for i := 0; i < len(table); i++ { + fmt.Printf(" %v", table[i]) + backendCount[table[i]]++ + } + fmt.Println() + for k, v := range backendCount { + fmt.Printf("\n backend_id:%v, count:%v\n", k, v) + } + +} + +func newCluster() *cluster_v2.Cluster { + var clusterName string = "outbound|5000||helloworld.default.svc.cluster.local" + lbEndpoints := make([]*endpoint.Endpoint, 0) + lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{ + Address: &core.SocketAddress{ + Protocol: 0, + Port: 0, + Ipv4: 4369, + }, + }) + lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{ + Address: &core.SocketAddress{ + Protocol: 0, + Port: 1, + Ipv4: 4369, + }, + }) + lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{ + Address: &core.SocketAddress{ + Protocol: 0, + Port: 2, + Ipv4: 4369, + }, + }) + lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{ + Address: &core.SocketAddress{ + Protocol: 0, + Port: 3, + Ipv4: 4369, + }, + }) + localityLbEndpoints := make([]*endpoint.LocalityLbEndpoints, 0) + llbep := &endpoint.LocalityLbEndpoints{ + LbEndpoints: lbEndpoints, + } + localityLbEndpoints = append(localityLbEndpoints, llbep) + cluster := &cluster_v2.Cluster{ + LbPolicy: cluster_v2.Cluster_MAGLEV, + Name: clusterName, + LoadAssignment: &endpoint.ClusterLoadAssignment{ + ClusterName: clusterName, + Endpoints: localityLbEndpoints, + }, + } + return cluster +} + +// newMaglevInnerMapSpec returns the spec for a maglev inner map. +func newMaglevInnerMapSpecTest(tableSize uint32) *ebpf.MapSpec { + return &ebpf.MapSpec{ + Name: MaglevInnerMapName, + Type: ebpf.Array, + KeySize: uint32(unsafe.Sizeof(uint32(0))), + ValueSize: uint32(unsafe.Sizeof(uint32(0))) * tableSize, + MaxEntries: 1, + } +} + +// NewMaglevOuterMap returns a new object representing a maglev outer map. +func NewMaglevOuterMap(name string, maxEntries int, tableSize uint32, innerMap *ebpf.MapSpec, pinPath string) (*ebpf.Map, error) { + m, err := ebpf.NewMapWithOptions(&ebpf.MapSpec{ + Name: name, + Type: ebpf.HashOfMaps, + KeySize: ClusterNameMaxLen, + ValueSize: uint32(unsafe.Sizeof(uint32(0))), + MaxEntries: uint32(maxEntries), + InnerMap: innerMap, + Pinning: ebpf.PinByName, + }, ebpf.MapOptions{ + PinPath: pinPath, + }) + + if err != nil { + return nil, err + } + + return m, nil +} diff --git a/pkg/controller/ads/cache.go b/pkg/controller/ads/cache.go index c40767a29..434fa5a36 100644 --- a/pkg/controller/ads/cache.go +++ b/pkg/controller/ads/cache.go @@ -441,6 +441,7 @@ func newApiRouteAction(action *config_route_v3.RouteAction) *route_v2.RouteActio RetryPolicy: &route_v2.RetryPolicy{ NumRetries: action.GetRetryPolicy().GetNumRetries().GetValue(), }, + HashPolicy: nil, } switch action.GetClusterSpecifier().(type) { @@ -467,6 +468,27 @@ func newApiRouteAction(action *config_route_v3.RouteAction) *route_v2.RouteActio log.Errorf("newApiRouteAction default, type is %T", action.GetClusterSpecifier()) return nil } + // current only support http header based hash policy + apiHashPolicys := make([]*route_v2.RouteAction_HashPolicy,0) + for _, hp := range action.GetHashPolicy() { + apiHashPolicy := &route_v2.RouteAction_HashPolicy{ + PolicySpecifier: nil, + } + switch hp.GetPolicySpecifier().(type) { + case *config_route_v3.RouteAction_HashPolicy_Header_: + header := &route_v2.RouteAction_HashPolicy_Header{ + HeaderName: hp.GetHeader().HeaderName, + } + apiHashPolicy.PolicySpecifier = &route_v2.RouteAction_HashPolicy_Header_{ + Header: header, + } + apiHashPolicys = append(apiHashPolicys, apiHashPolicy) + default: + log.Errorf("newApiRouteAction HashPolicy default, type is %T", hp.GetPolicySpecifier()) + return nil + } + } + apiAction.HashPolicy = apiHashPolicys return apiAction } diff --git a/pkg/utils/hash/murmur3.go b/pkg/utils/hash/murmur3.go new file mode 100644 index 000000000..c24a8f788 --- /dev/null +++ b/pkg/utils/hash/murmur3.go @@ -0,0 +1,146 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + + +package hash + + +import ( + "unsafe" +) + + +// Hash128 calculates a 128 bits hash for the given data. It returns different +// results when running on big-endian and little-endian machines. +// +// The code is intended to be as close as possible to the reference implementation +// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp. +func Hash128(data []byte, seed uint32) (uint64, uint64) { + const ( + c1 = uint64(0x87c37b91114253d5) + c2 = uint64(0x4cf5ad432745937f) + ) + + + nblocks := len(data) / 16 + + + h1 := uint64(seed) + h2 := uint64(seed) + + + for i := 0; i < nblocks; i++ { + tmp := (*[2]uint64)(unsafe.Pointer(&data[i*16])) + k1 := tmp[0] + k2 := tmp[1] + + + k1 *= c1 + k1 = rotl64(k1, 31) + k1 *= c2 + h1 ^= k1 + h1 = rotl64(h1, 27) + h1 += h2 + h1 = h1*5 + 0x52dce729 + k2 *= c2 + k2 = rotl64(k2, 33) + k2 *= c1 + h2 ^= k2 + h2 = rotl64(h2, 31) + h2 += h1 + h2 = h2*5 + 0x38495ab5 + } + + + k1 := uint64(0) + k2 := uint64(0) + + + tail := data[nblocks*16:] + + + switch len(tail) & 15 { + case 15: + k2 ^= uint64(tail[14]) << 48 + fallthrough + case 14: + k2 ^= uint64(tail[13]) << 40 + fallthrough + case 13: + k2 ^= uint64(tail[12]) << 32 + fallthrough + case 12: + k2 ^= uint64(tail[11]) << 24 + fallthrough + case 11: + k2 ^= uint64(tail[10]) << 16 + fallthrough + case 10: + k2 ^= uint64(tail[9]) << 8 + fallthrough + case 9: + k2 ^= uint64(tail[8]) << 0 + k2 *= c2 + k2 = rotl64(k2, 33) + k2 *= c1 + h2 ^= k2 + fallthrough + case 8: + k1 ^= uint64(tail[7]) << 56 + fallthrough + case 7: + k1 ^= uint64(tail[6]) << 48 + fallthrough + case 6: + k1 ^= uint64(tail[5]) << 40 + fallthrough + case 5: + k1 ^= uint64(tail[4]) << 32 + fallthrough + case 4: + k1 ^= uint64(tail[3]) << 24 + fallthrough + case 3: + k1 ^= uint64(tail[2]) << 16 + fallthrough + case 2: + k1 ^= uint64(tail[1]) << 8 + fallthrough + case 1: + k1 ^= uint64(tail[0]) << 0 + k1 *= c1 + k1 = rotl64(k1, 31) + k1 *= c2 + h1 ^= k1 + } + + + h1 ^= uint64(len(data)) + h2 ^= uint64(len(data)) + h1 += h2 + h2 += h1 + h1 = fmix64(h1) + h2 = fmix64(h2) + h1 += h2 + h2 += h1 + + + return h1, h2 +} + + +func rotl64(x uint64, r int8) uint64 { + return (x << r) | (x >> (64 - r)) +} + + +func fmix64(k uint64) uint64 { + k ^= k >> 33 + k *= 0xff51afd7ed558ccd + k ^= k >> 33 + k *= 0xc4ceb9fe1a85ec53 + k ^= k >> 33 + + + return k +} \ No newline at end of file