diff --git a/api/cluster/cluster.proto b/api/cluster/cluster.proto
index bf5048c45..f75e24b10 100644
--- a/api/cluster/cluster.proto
+++ b/api/cluster/cluster.proto
@@ -12,6 +12,7 @@ message Cluster {
ROUND_ROBIN = 0;
LEAST_REQUEST = 1;
RANDOM = 3;
+ MAGLEV = 5;
}
core.ApiStatus api_status = 128;
diff --git a/api/route/route_components.proto b/api/route/route_components.proto
index 1ae076f8a..6cfe3b250 100644
--- a/api/route/route_components.proto
+++ b/api/route/route_components.proto
@@ -30,10 +30,26 @@ message RouteAction {
// assigned to each cluster.
WeightedCluster weighted_clusters = 3;
}
+
+ message HashPolicy {
+ message Header {
+ // The name of the request header that will be used to obtain the hash
+ // key. If the request header is not present, no hash will be produced.
+ string header_name = 1;
+ }
+
+ oneof policy_specifier {
+
+ // Header hash policy.
+ Header header = 1;
+ }
+ }
+
// the matched prefix (or path) should be swapped with this value.
string prefix_rewrite = 5;
uint32 timeout = 8;
RetryPolicy retry_policy = 9;
+ repeated HashPolicy hash_policy = 15;
}
message RetryPolicy {
diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h
index e5fd4a358..6aa834a42 100644
--- a/bpf/kmesh/ads/include/cluster.h
+++ b/bpf/kmesh/ads/include/cluster.h
@@ -11,6 +11,8 @@
#include "endpoint/endpoint.pb-c.h"
#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN
+#define MAGLEV_TABLE_SIZE 16381
+#define HASH_INIT4_SEED 0xcafe
struct {
__uint(type, BPF_MAP_TYPE_HASH);
@@ -20,6 +22,22 @@ struct {
__uint(max_entries, MAP_SIZE_OF_CLUSTER);
} map_of_cluster SEC(".maps");
+struct inner_of_maglev {
+ __uint(type, BPF_MAP_TYPE_ARRAY);
+ __uint(max_entries, 1);
+ __uint(key_size, sizeof(__u32));
+ __uint(value_size, sizeof(__u32) * MAGLEV_TABLE_SIZE);
+};
+
+struct {
+ __uint(type, BPF_MAP_TYPE_HASH_OF_MAPS);
+ __uint(key_size, CLUSTER_NAME_MAX_LEN);
+ __uint(value_size, sizeof(__u32));
+ __uint(max_entries, MAP_SIZE_OF_CLUSTER);
+ __uint(map_flags, BPF_F_NO_PREALLOC);
+ __array(values, struct inner_of_maglev);
+} outer_of_maglev SEC(".maps");
+
struct cluster_endpoints {
__u32 ep_num;
/* */
@@ -63,6 +81,16 @@ static inline struct cluster_endpoints *map_lookup_cluster_eps(const char *clust
return kmesh_map_lookup_elem(&map_of_cluster_eps, cluster_name);
}
+static inline __u32 *map_lookup_cluster_inner_map(const char *cluster_name)
+{
+ return kmesh_map_lookup_elem(&outer_of_maglev, cluster_name);
+}
+
+static inline __u32 *map_lookup_lb_hash() {
+ int location = 0;
+ return kmesh_map_lookup_elem(&map_of_lb_hash,&location);
+}
+
static inline int map_add_cluster_eps(const char *cluster_name, const struct cluster_endpoints *eps)
{
return kmesh_map_update_elem(&map_of_cluster_eps, cluster_name, eps);
@@ -219,7 +247,85 @@ static inline void *loadbalance_round_robin(struct cluster_endpoints *eps)
return (void *)eps->ep_identity[idx];
}
-static inline void *cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy)
+/* The daddr is explicitly excluded from the hash here in order to allow for
+ * backend selection to choose the same backend even on different service VIPs.
+ */
+// static __always_inline __u32 hash_from_tuple_v4(struct bpf_sock * sk)
+// {
+
+// BPF_LOG(INFO, CLUSTER, "sk: src_ip is:%x, src_port is:%d\n", sk->src_ip4,sk->src_port);
+// BPF_LOG(INFO, CLUSTER, "sk: dst_port is:%d, sk->protocol is: %x\n",sk->dst_port,sk->protocol);
+// return jhash_3words(sk->src_ip4,
+// ((__u32) sk->dst_port << 16) | sk->src_port,
+// sk->protocol, HASH_INIT4_SEED);
+// }
+
+static __always_inline __u32 map_array_get_32(__u32 *array, __u32 index, const __u32 limit)
+{
+ __u32 datum = 0;
+
+ // if (__builtin_constant_p(index) ||
+ // !__builtin_constant_p(limit))
+ // __throw_build_bug();
+
+ /* LLVM tends to optimize code away that is needed for the verifier to
+ * understand dynamic map access. Input constraint is that index < limit
+ * for this util function, so we never fail here, and returned datum is
+ * always valid.
+ */
+ asm volatile("%[index] <<= 2\n\t"
+ "if %[index] > %[limit] goto +1\n\t"
+ "%[array] += %[index]\n\t"
+ "%[datum] = *(u32 *)(%[array] + 0)\n\t"
+ : [datum] "=r"(datum)
+ : [limit] "i"(limit), [array] "r"(array), [index] "r"(index)
+ : /* no clobbers */);
+
+ return datum;
+}
+
+static inline void *loadbalance_maglev_select_backend(struct cluster_endpoints *eps, char *name, ctx_buff_t *ctx)
+{
+ if (!eps || eps->ep_num == 0)
+ return NULL;
+
+ __u32 inner_key = 0;
+ __u32 *backend_ids;
+ __u32 index;
+ __u32 id;
+ __u32 *inner_of_maglev;
+ struct bpf_sock_ops *skops;
+ __u32 *hash_ptr;
+ __u32 hash;
+
+ inner_of_maglev = map_lookup_cluster_inner_map(name);
+ if (!inner_of_maglev) {
+ return NULL;
+ }
+ backend_ids = bpf_map_lookup_elem(inner_of_maglev, &inner_key);
+ if (!backend_ids) {
+ return NULL;
+ }
+ hash_ptr = map_lookup_lb_hash();
+ if (!hash_ptr || *hash_ptr == 0) {
+ hash = bpf_get_prandom_u32();
+ }else {
+ hash = *hash_ptr;
+ BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, got a hash value:%u\n", hash);
+ }
+ index = hash % MAGLEV_TABLE_SIZE;
+ if (index >= MAGLEV_TABLE_SIZE)
+ return NULL;
+ id = map_array_get_32(backend_ids, index, MAGLEV_TABLE_SIZE);
+ BPF_LOG(INFO, CLUSTER, "lb_policy is maglev, select backend id:%u\n", id);
+ if (id >= KMESH_PER_ENDPOINT_NUM)
+ return NULL;
+
+ return (void *)eps->ep_identity[id];
+}
+
+static inline void *
+cluster_get_ep_identity_by_lb_policy(struct cluster_endpoints *eps, __u32 lb_policy, char *name, ctx_buff_t *ctx)
{
void *ep_identity = NULL;
@@ -227,6 +333,9 @@ static inline void *cluster_get_ep_identity_by_lb_policy(struct cluster_endpoint
case CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN:
ep_identity = loadbalance_round_robin(eps);
break;
+ case CLUSTER__CLUSTER__LB_POLICY__MAGLEV:
+ ep_identity = loadbalance_maglev_select_backend(eps, name, ctx);
+ break;
default:
BPF_LOG(INFO, CLUSTER, "%d lb_policy is unsupported, default:ROUND_ROBIN\n", lb_policy);
ep_identity = loadbalance_round_robin(eps);
@@ -254,7 +363,8 @@ static inline Core__SocketAddress *cluster_get_ep_sock_addr(const void *ep_ident
return sock_addr;
}
-static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx)
+static inline int
+cluster_handle_loadbalance(Cluster__Cluster *cluster, address_t *addr, ctx_buff_t *ctx, const char *cluster_name)
{
char *name = NULL;
void *ep_identity = NULL;
@@ -273,7 +383,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_
return -EAGAIN;
}
- ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy);
+ ep_identity = cluster_get_ep_identity_by_lb_policy(eps, cluster->lb_policy, cluster_name, ctx);
if (!ep_identity) {
BPF_LOG(ERR, CLUSTER, "cluster=\"%s\" handle lb failed\n", name);
return -EAGAIN;
diff --git a/bpf/kmesh/ads/include/config.h b/bpf/kmesh/ads/include/config.h
index 714a06a19..a9d2a4913 100644
--- a/bpf/kmesh/ads/include/config.h
+++ b/bpf/kmesh/ads/include/config.h
@@ -54,17 +54,19 @@
// ************
// array len
-#define KMESH_NAME_LEN 64
-#define KMESH_TYPE_LEN 64
-#define KMESH_HOST_LEN 128
-#define KMESH_FILTER_CHAINS_LEN 64
-#define KMESH_HTTP_DOMAIN_NUM 32
-#define KMESH_HTTP_DOMAIN_LEN 128
-#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN
-#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER
-#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST
-#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE
-#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT
-#define KMESH_PER_HEADER_MUM 32
-#define KMESH_PER_WEIGHT_CLUSTER_NUM 32
+#define KMESH_NAME_LEN 64
+#define KMESH_TYPE_LEN 64
+#define KMESH_HOST_LEN 128
+#define KMESH_FILTER_CHAINS_LEN 64
+#define KMESH_HTTP_DOMAIN_NUM 32
+#define KMESH_HTTP_DOMAIN_LEN 128
+#define KMESH_PER_FILTER_CHAIN_NUM MAP_SIZE_OF_PER_FILTER_CHAIN
+#define KMESH_PER_FILTER_NUM MAP_SIZE_OF_PER_FILTER
+#define KMESH_PER_VIRT_HOST_NUM MAP_SIZE_OF_PER_VIRTUAL_HOST
+#define KMESH_PER_ROUTE_NUM MAP_SIZE_OF_PER_ROUTE
+#define KMESH_PER_ENDPOINT_NUM MAP_SIZE_OF_PER_ENDPOINT
+#define KMESH_PER_HEADER_MUM 32
+#define KMESH_PER_WEIGHT_CLUSTER_NUM 32
+#define KMESH_PER_HASH_POLICY_NUM 1
+#define KMESH_PER_HASH_POLICY_MSG_HEADER_LEN 10
#endif // _CONFIG_H_
diff --git a/bpf/kmesh/ads/include/kmesh_common.h b/bpf/kmesh/ads/include/kmesh_common.h
index 2bb61339d..9f5750f16 100644
--- a/bpf/kmesh/ads/include/kmesh_common.h
+++ b/bpf/kmesh/ads/include/kmesh_common.h
@@ -69,6 +69,13 @@ static inline char *bpf_strncpy(char *dst, int n, const char *src)
}
#endif
+struct {
+ __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+ __uint(key_size, sizeof(int));
+ __uint(value_size, sizeof(__u32));
+ __uint(max_entries, 1);
+} map_of_lb_hash SEC(".maps");
+
typedef enum {
KMESH_TAIL_CALL_LISTENER = 1,
KMESH_TAIL_CALL_FILTER_CHAIN,
diff --git a/bpf/kmesh/ads/include/route_config.h b/bpf/kmesh/ads/include/route_config.h
index a90c4a153..72883ad8d 100644
--- a/bpf/kmesh/ads/include/route_config.h
+++ b/bpf/kmesh/ads/include/route_config.h
@@ -334,6 +334,89 @@ static inline char *route_get_cluster(const Route__Route *route)
return kmesh_get_ptr_val(_(route_act->cluster));
}
+static inline __u32 lb_hash_based_header(unsigned char *header_name)
+{
+ __u32 hash = 0;
+ struct bpf_mem_ptr *msg_header = NULL;
+ char *msg_header_v = NULL;
+ __u32 msg_header_len = 0;
+ __u32 k;
+ __u32 c;
+ char msg_header_cp[KMESH_PER_HASH_POLICY_MSG_HEADER_LEN] = {'\0'};
+
+ if (!header_name)
+ return hash;
+ // when header name is not null, compute a hash value.
+ BPF_LOG(INFO, ROUTER_CONFIG, "Got a header name:%s\n", header_name);
+ msg_header = (struct bpf_mem_ptr *)bpf_get_msg_header_element(header_name);
+ if (!msg_header)
+ return hash;
+
+ msg_header_v = _(msg_header->ptr);
+ msg_header_len = _(msg_header->size);
+ if (!msg_header_len)
+ return hash;
+
+ BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value len:%u\n", msg_header_len);
+ if (!bpf_strncpy(msg_header_cp, msg_header_len, msg_header_v))
+ return hash;
+ BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value:%s\n", msg_header_cp);
+ hash = 5318;
+#pragma unroll
+ for (k = 0; k < KMESH_PER_HASH_POLICY_MSG_HEADER_LEN; k++) {
+ if (!msg_header_v || k >= msg_header_len)
+ break;
+ c = *((unsigned char *)msg_header_cp + k);
+ hash = ((hash << 5) + hash) + c;
+ }
+
+ return hash;
+}
+
+static inline int map_add_lb_hash(__u32 hash)
+{
+ int location = 0;
+ return kmesh_map_update_elem(&map_of_lb_hash, &location, &hash);
+}
+
+static void consistent_hash_policy(const Route__Route *route)
+{
+ Route__RouteAction *route_act = NULL;
+ void *hash_policy_ptr = NULL;
+ __u32 i;
+ Route__RouteAction__HashPolicy *hash_policy;
+ char *header_name = NULL;
+ Route__RouteAction__HashPolicy__Header *header;
+ uint32_t lb_hash = 0;
+
+ route_act = kmesh_get_ptr_val(_(route->route));
+ if (route_act) {
+ hash_policy_ptr = kmesh_get_ptr_val(_(route_act->hash_policy));
+ }
+
+#pragma unroll
+ for (i = 0; i < KMESH_PER_HASH_POLICY_NUM; i++) {
+ if (hash_policy_ptr == NULL) {
+ break;
+ }
+ hash_policy = (Route__RouteAction__HashPolicy *)kmesh_get_ptr_val((void *)*((__u64 *)hash_policy_ptr + i));
+ if (!hash_policy)
+ break;
+ if (hash_policy->policy_specifier_case == ROUTE__ROUTE_ACTION__HASH_POLICY__POLICY_SPECIFIER_HEADER) {
+ header = kmesh_get_ptr_val(_(hash_policy->header));
+ if (!header)
+ continue;
+ header_name = kmesh_get_ptr_val(_(header->header_name));
+ break;
+ }
+ }
+ lb_hash = lb_hash_based_header(header_name);
+ if (map_add_lb_hash(lb_hash) != 0) {
+ BPF_LOG(ERR, ROUTER_CONFIG, "failed to update cluster lb hash value\n");
+ }
+ BPF_LOG(INFO, ROUTER_CONFIG, "Got a lb hash:%u\n", lb_hash);
+}
+
SEC_TAIL(KMESH_PORG_CALLS, KMESH_TAIL_CALL_ROUTER_CONFIG)
int route_config_manager(ctx_buff_t *ctx)
{
@@ -373,6 +456,7 @@ int route_config_manager(ctx_buff_t *ctx)
}
cluster = route_get_cluster(route);
+ consistent_hash_policy(route);
if (!cluster) {
BPF_LOG(ERR, ROUTER_CONFIG, "failed to get cluster\n");
return KMESH_TAIL_CALL_RET(-1);
diff --git a/docs/proposal/consistent_hash.md b/docs/proposal/consistent_hash.md
new file mode 100644
index 000000000..2b7f3d51f
--- /dev/null
+++ b/docs/proposal/consistent_hash.md
@@ -0,0 +1,171 @@
+---
+title: Consistent hash LB
+authors:
+- "@bfforever" # Authors' GitHub accounts here.
+reviewers:
+- "@supercharge-xsy"
+- "@hzxuzhonghu"
+- "@nlwcy"
+- TBD
+approvers:
+- "@robot"
+- TBD
+
+creation-date: 2024-05-29
+
+---
+
+## Your short, descriptive title
+
+Add Consistent hash based LB in ebpf prog
+
+### Summary
+
+
+Maglev consistent hash algorithm
+IP tuple info based hash
+L7 http header based hash
+
+
+#### Goals
+
+
+Networking load balance
+Guarrant a client conn always send request to same backend
+Minimize remapping
+
+
+### Proposal
+
+#### maglev
+Algorithm main purpose: map all backends into a fixed look up table, require even mapping, and try to minimize mapping changes when backend change (remove or add a backend), avoid global remapping.
+
+
+#### ebpf implement situation or point
+
+Cgroup level
+In cgroup socket implement l4 and l7 header based consistent hash.
+Container veth tc level
+In tc level implement l4 based and l7 based consistent hash.
+
+
+
+### Design Details
+
+![consistent_hash](pics/kmesh_consistent_hash.svg)
+#### maglev
+main steps:
+
+
+Map endpoints of Cluster to a fixed table
+Based on Cluster config from istiod to generate a integer table, the value of table is endpoints index;
+
+
+Base l4 or l7 info compute a hash value
+use hash value get an table index, which is index=hash%len(table)
;
+Use index to access table
+ep=tables[index]
+
+
+
+##### maglev lookup table
+Use a compact table, table entry is bit width of endpoint count.
+For the table size default is 16381.
+
+#### L4 implement
+
+(saddr,daddr,sport,dport,protocol)-->hash
+then execute above second step.
+
+
+##### risk in l4 implement
+In cgroup level, because of lacking saddr and sport, can't do l4 lb effectively.
+
+By the advice of @supercharge-xsy, add (net namespace cookie + daddr + dport ) to compute hash value.
+
+
+#### L7 implement
+```
+apiVersion: networking.istio.io/v1alpha3
+kind: DestinationRule
+metadata:
+ name: helloworld
+spec:
+ host: helloworld
+ trafficPolicy:
+ loadBalancer:
+ consistentHash:
+ httpHeaderName: testHeader
+ maglev:
+ tableSize:
+```
+
+specify which l7 header is selected to do hash
+extract l7 header info from http header
+compute a hash value base header value
+then execute above second step
+
+
+##### Risk in l7 header value to compute a hash
+Because in ebpf prog, there is a lack of string-based hash compute.
+
+Below is a simple string-based hash method.Probability we need an effective hash method.
+```
+static inline __u32 lb_hash_based_header(unsigned char *header_name) {
+ __u32 hash = 0;
+ struct bpf_mem_ptr *msg_header = NULL;
+ char *msg_header_v = NULL;
+ __u32 msg_header_len = 0;
+ __u32 k;
+ __u32 c;
+ char msg_header_cp[KMESH_PER_HASH_POLICY_MSG_HEADER_LEN] = {'\0'};
+
+ if (!header_name)
+ return hash;
+ // when header name is not null, compute a hash value.
+ BPF_LOG(INFO, ROUTER_CONFIG, "Got a header name:%s\n", header_name);
+ msg_header = (struct bpf_mem_ptr *)bpf_get_msg_header_element(header_name);
+ if (!msg_header)
+ return hash;
+
+ msg_header_v = _(msg_header->ptr);
+ msg_header_len = _(msg_header->size);
+ if (!msg_header_len)
+ return hash;
+
+ BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value len:%u\n", msg_header_len);
+ if (!bpf_strncpy(msg_header_cp, msg_header_len, msg_header_v))
+ return hash;
+ BPF_LOG(INFO, ROUTER_CONFIG, "Got a header value:%s\n", msg_header_cp);
+ // Currently a simple hash method
+ hash = 5318;
+# pragma unroll
+ for (k = 0; k < KMESH_PER_HASH_POLICY_MSG_HEADER_LEN;k++) {
+ if (!msg_header_v || k >= msg_header_len)
+ break;
+ c = *((unsigned char *)msg_header_cp + k);
+ hash = ((hash << 5) + hash) + c;
+ }
+
+ return hash;
+}
+```
+
+
+#### Cgroup level
+Directly through lb logic, to change the daddr and dport of socket.
+
+#### Tc level
+Do lb algorithm based on network packet or message, to change the daddr and dport of packet.
+
+This way can get full ip address info to do lb.
+
+#### Test Plan
+
+
+Table mapping is even distribution
+Add or remove a endpoint only affect few connections
+
+
+#### Ring hash implement reference
+https://github.com/envoyproxy/envoy/blob/b0cc284f86e328a33525d6d2b8f6ab00fff240b8/source/extensions/load_balancing_policies/ring_hash/ring_hash_lb.cc#L92
diff --git a/docs/proposal/pics/kmesh_consistent_hash.svg b/docs/proposal/pics/kmesh_consistent_hash.svg
new file mode 100644
index 000000000..a40d4b250
--- /dev/null
+++ b/docs/proposal/pics/kmesh_consistent_hash.svg
@@ -0,0 +1 @@
+out map inner map cluster nam... inner ma... ep_ids 0 0 1 2 1 0 2 1 table ebpf prog lb logicĀ hash % len(table) istiod Kmesh Daemon 0 2 endpoi... table generate by endpoints with maglev algorithm
table generate by endpoints with maglev algorithm 1 sock_ad... sock_ad... sock_ad... Text is not SVG - cannot display
\ No newline at end of file
diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go
index 726296e05..81945c28e 100644
--- a/pkg/bpf/bpf.go
+++ b/pkg/bpf/bpf.go
@@ -36,6 +36,7 @@ import (
"kmesh.net/kmesh/pkg/constants"
"kmesh.net/kmesh/pkg/logger"
"kmesh.net/kmesh/pkg/version"
+ "kmesh.net/kmesh/pkg/consistenthash/maglev"
)
var (
@@ -101,6 +102,12 @@ func (l *BpfLoader) StartAdsMode() (err error) {
l.Stop()
return fmt.Errorf("deserial_init failed:%v", ret)
}
+
+ if err = maglev.InitMaglevMap(); err != nil {
+ l.Stop()
+ return fmt.Errorf("lb maglev init failed, %s", err)
+ }
+
return nil
}
diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go
index 357d5243e..75332f50c 100644
--- a/pkg/cache/v2/cluster.go
+++ b/pkg/cache/v2/cluster.go
@@ -27,6 +27,7 @@ import (
cluster_v2 "kmesh.net/kmesh/api/v2/cluster"
core_v2 "kmesh.net/kmesh/api/v2/core"
maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps"
+ "kmesh.net/kmesh/pkg/consistenthash/maglev"
)
type ClusterCache struct {
@@ -129,6 +130,12 @@ func (cache *ClusterCache) Flush() {
for name, cluster := range cache.apiClusterCache {
if cluster.GetApiStatus() == core_v2.ApiStatus_UPDATE {
err := maps_v2.ClusterUpdate(name, cluster)
+ if cluster.GetLbPolicy() == cluster_v2.Cluster_MAGLEV {
+ // create consistent lb here
+ if err := maglev.CreateLB(cluster);err != nil {
+ log.Errorf("maglev lb update %v cluster failed: %v",name, err)
+ }
+ }
if err == nil {
// reset api status after successfully updated
cluster.ApiStatus = core_v2.ApiStatus_NONE
diff --git a/pkg/consistenthash/maglev/maglev.go b/pkg/consistenthash/maglev/maglev.go
new file mode 100644
index 000000000..aa7739e07
--- /dev/null
+++ b/pkg/consistenthash/maglev/maglev.go
@@ -0,0 +1,204 @@
+package maglev
+
+import (
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "unsafe"
+
+ "github.com/cilium/ebpf"
+ cluster_v2 "kmesh.net/kmesh/api/v2/cluster"
+ "kmesh.net/kmesh/api/v2/endpoint"
+ "kmesh.net/kmesh/pkg/logger"
+ "kmesh.net/kmesh/pkg/utils/hash"
+)
+
+const (
+ DefaultTableSize uint64 = 16381
+ DefaultHashSeed = "JLfvgnHc2kaSUFaI"
+ MaglevOuterMapName = "outer_of_maglev"
+ MaglevInnerMapName = "inner_of_maglev"
+ MaglevMapMaxEntries = 65536
+ ClusterNameMaxLen = 192
+)
+
+var (
+ outer *ebpf.Map
+ seedMurmur uint32
+ maglevTableSize uint64
+)
+
+var log = logger.NewLoggerField("lb/maglev")
+
+type Backend struct {
+ ep *endpoint.Endpoint
+ index int
+ offset uint64
+ skip uint64
+ next uint64
+}
+
+func InitMaglevMap() error {
+ log.Println("InitMaglevMap")
+
+ maglevTableSize = DefaultTableSize
+
+ opt := &ebpf.LoadPinOptions{}
+ outer_map, err := ebpf.LoadPinnedMap("/sys/fs/bpf"+"/bpf_kmesh/map/"+MaglevOuterMapName, opt)
+ if err != nil {
+ return fmt.Errorf("load outer map of maglev failed err: %v", err)
+ }
+ outer = outer_map
+
+ d, err := base64.StdEncoding.DecodeString(DefaultHashSeed)
+ if err != nil {
+ return fmt.Errorf("cannot decode base64 Maglev hash seed %q: %w", DefaultHashSeed, err)
+ }
+ if len(d) != 12 {
+ return fmt.Errorf("decoded hash seed is %d bytes (not 12 bytes)", len(d))
+ }
+ seedMurmur = uint32(d[0])<<24 | uint32(d[1])<<16 | uint32(d[2])<<8 | uint32(d[3])
+
+ return nil
+}
+
+func CreateLB(cluster *cluster_v2.Cluster) error {
+ if cluster == nil {
+ return errors.New("cluster is nil")
+ }
+
+ clusterName := cluster.GetName()
+ table, err := getLookupTable(cluster, maglevTableSize)
+ if err != nil {
+ return err
+ }
+ backendIDs := make([]uint32, maglevTableSize)
+ for i, id := range table {
+ backendIDs[i] = uint32(id)
+ }
+
+ err = updateMaglevTable(backendIDs, clusterName)
+ if err != nil {
+ return fmt.Errorf("updateMaglevTable fail err:%v", err)
+ }
+
+ return nil
+}
+
+// createMaglevInnerMap creates a new Maglev inner map in the kernel
+// using the given table size.
+func createMaglevInnerMap(tableSize uint32) (*ebpf.Map, error) {
+ spec := &ebpf.MapSpec{
+ Name: MaglevInnerMapName,
+ Type: ebpf.Array,
+ KeySize: uint32(unsafe.Sizeof(uint32(0))),
+ ValueSize: uint32(unsafe.Sizeof(uint32(0))) * tableSize,
+ MaxEntries: 1,
+ }
+
+ m, err := ebpf.NewMap(spec)
+ if err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func updateMaglevTable(backendIDs []uint32, clusterName string) error {
+ log.Debugf("updateMaglevTable... of cluster:%v", clusterName)
+ if outer == nil {
+ return errors.New("outer maglev maps not yet initialized")
+ }
+ inner, err := createMaglevInnerMap(uint32(maglevTableSize))
+ if err != nil {
+ return err
+ }
+ defer inner.Close()
+
+ var key uint32 = 0
+ if err := inner.Update(key, backendIDs, 0); err != nil {
+ return fmt.Errorf("updating backends of cluster %v : %w", clusterName, err)
+ }
+
+ if len(clusterName) > ClusterNameMaxLen {
+ clusterName = clusterName[:ClusterNameMaxLen]
+ }
+ var maglevKey [ClusterNameMaxLen]byte
+ copy(maglevKey[:], []byte(clusterName))
+
+ if err := outer.Update(maglevKey, uint32(inner.FD()), 0); err != nil {
+ return fmt.Errorf("updating cluster %v: %w", clusterName, err)
+ }
+ return nil
+}
+
+func getOffsetAndSkip(address string, m uint64) (uint64, uint64) {
+ h1, h2 := hash.Hash128([]byte(address), seedMurmur)
+ offset := h1 % m
+ skip := (h2 % (m - 1)) + 1
+
+ return offset, skip
+}
+
+func getPermutation(b Backend) uint64 {
+ return (b.offset + (b.skip * b.next)) % maglevTableSize
+}
+
+func getLookupTable(cluster *cluster_v2.Cluster, tableSize uint64) ([]int, error) {
+
+ loadAssignment := cluster.GetLoadAssignment()
+ clusterName := cluster.GetName()
+ localityLbEps := loadAssignment.GetEndpoints()
+
+ if len(localityLbEps) == 0 {
+ return nil, fmt.Errorf("current cluster:%v has no any lb endpoints", clusterName)
+ }
+
+ flatEps := make([]*endpoint.Endpoint, 0)
+
+ //yet not consider weight
+ for _, localityLbEp := range localityLbEps {
+ eps := localityLbEp.GetLbEndpoints()
+ flatEps = append(flatEps, eps...)
+ }
+ backends := make([]Backend, 0, len(flatEps))
+
+ for i, ep := range flatEps {
+ epOffset, epSkip := getOffsetAndSkip(ep.GetAddress().String(), maglevTableSize)
+ b := Backend{
+ ep: ep,
+ index: i,
+ offset: epOffset,
+ skip: epSkip,
+ next: 0,
+ }
+ backends = append(backends, b)
+ }
+
+ if len(backends) == 0 {
+ return nil, fmt.Errorf("current cluster:%v has no any lb backends", clusterName)
+ }
+
+ length := len(backends)
+ lookUpTable := make([]int, tableSize)
+
+ for i := uint64(0); i < tableSize; i++ {
+ lookUpTable[i] = -1
+ }
+
+ for n := uint64(0); n < tableSize; n++ {
+ j := int(n) % length
+ b := backends[j]
+ for {
+ c := getPermutation(b)
+ for lookUpTable[c] >= 0 {
+ b.next++
+ c = getPermutation(b)
+ }
+ lookUpTable[c] = b.index
+ b.next++
+ break
+ }
+ }
+
+ return lookUpTable, nil
+}
diff --git a/pkg/consistenthash/maglev/maglev_test.go b/pkg/consistenthash/maglev/maglev_test.go
new file mode 100644
index 000000000..86b97d515
--- /dev/null
+++ b/pkg/consistenthash/maglev/maglev_test.go
@@ -0,0 +1,176 @@
+package maglev
+
+import (
+ "fmt"
+ "os"
+ "testing"
+ "unsafe"
+
+ "github.com/cilium/ebpf"
+ "github.com/stretchr/testify/suite"
+ cluster_v2 "kmesh.net/kmesh/api/v2/cluster"
+ "kmesh.net/kmesh/api/v2/core"
+ "kmesh.net/kmesh/api/v2/endpoint"
+)
+
+func TestMaglevTestSuite(t *testing.T) {
+ suite.Run(t, new(MaglevTestSuite))
+}
+
+type MaglevTestSuite struct {
+ mapPath string
+ suite.Suite
+}
+
+func (suite *MaglevTestSuite) SetupSuite() {
+ mapPath := "/sys/fs/bpf/bpf_kmesh/map/"
+ suite.mapPath = mapPath
+ _, err := os.Stat(mapPath)
+ if os.IsNotExist(err) {
+ err := os.MkdirAll(mapPath, 0755)
+ if err != nil {
+ fmt.Println("can not mkdir bpf map path", err)
+ }
+ } else if err != nil {
+ fmt.Println("other err:", err)
+ return
+ } else {
+ fmt.Println("bpf map path already exist ", mapPath)
+ }
+ dummyInnerMapSpec := newMaglevInnerMapSpecTest(uint32(DefaultTableSize))
+ _, err = NewMaglevOuterMap(MaglevOuterMapName, MaglevMapMaxEntries, uint32(DefaultTableSize), dummyInnerMapSpec, mapPath)
+ if err != nil {
+ fmt.Printf("NewMaglevOuterMap err: %v\n", err)
+ }
+ InitMaglevMap()
+}
+
+func (suite *MaglevTestSuite) TearDownSuite() {
+
+ fmt.Println(">>> From TearDownSuite")
+}
+
+func (suite *MaglevTestSuite) TestCreateLB() {
+ cluster := newCluster()
+ clusterName := cluster.GetName()
+
+ err := CreateLB(cluster)
+ if err != nil {
+ fmt.Println(err)
+ }
+
+ var inner_fd uint32
+ var maglevKey [ClusterNameMaxLen]byte
+
+ copy(maglevKey[:], []byte(clusterName))
+ opt := &ebpf.LoadPinOptions{}
+ outer_map, err := ebpf.LoadPinnedMap(suite.mapPath+MaglevOuterMapName, opt)
+ if err != nil {
+ fmt.Printf("LoadPinnedMap err: %v \n", err)
+ }
+ err = outer_map.Lookup(maglevKey, &inner_fd)
+ if err != nil {
+ fmt.Printf("Lookup with key %v , err %v \n", clusterName, err)
+ }
+ fmt.Println("inner fd: ", inner_fd)
+
+}
+
+func (suite *MaglevTestSuite) TestGetLookupTable() {
+ cluster := newCluster()
+
+ table, err := getLookupTable(cluster, DefaultTableSize)
+ if err != nil {
+ fmt.Printf("getLookupTable err:%v \n", err)
+ }
+ backendCount := make(map[int]int)
+ // print backend id distribute
+ for i := 0; i < len(table); i++ {
+ fmt.Printf(" %v", table[i])
+ backendCount[table[i]]++
+ }
+ fmt.Println()
+ for k, v := range backendCount {
+ fmt.Printf("\n backend_id:%v, count:%v\n", k, v)
+ }
+
+}
+
+func newCluster() *cluster_v2.Cluster {
+ var clusterName string = "outbound|5000||helloworld.default.svc.cluster.local"
+ lbEndpoints := make([]*endpoint.Endpoint, 0)
+ lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{
+ Address: &core.SocketAddress{
+ Protocol: 0,
+ Port: 0,
+ Ipv4: 4369,
+ },
+ })
+ lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{
+ Address: &core.SocketAddress{
+ Protocol: 0,
+ Port: 1,
+ Ipv4: 4369,
+ },
+ })
+ lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{
+ Address: &core.SocketAddress{
+ Protocol: 0,
+ Port: 2,
+ Ipv4: 4369,
+ },
+ })
+ lbEndpoints = append(lbEndpoints, &endpoint.Endpoint{
+ Address: &core.SocketAddress{
+ Protocol: 0,
+ Port: 3,
+ Ipv4: 4369,
+ },
+ })
+ localityLbEndpoints := make([]*endpoint.LocalityLbEndpoints, 0)
+ llbep := &endpoint.LocalityLbEndpoints{
+ LbEndpoints: lbEndpoints,
+ }
+ localityLbEndpoints = append(localityLbEndpoints, llbep)
+ cluster := &cluster_v2.Cluster{
+ LbPolicy: cluster_v2.Cluster_MAGLEV,
+ Name: clusterName,
+ LoadAssignment: &endpoint.ClusterLoadAssignment{
+ ClusterName: clusterName,
+ Endpoints: localityLbEndpoints,
+ },
+ }
+ return cluster
+}
+
+// newMaglevInnerMapSpec returns the spec for a maglev inner map.
+func newMaglevInnerMapSpecTest(tableSize uint32) *ebpf.MapSpec {
+ return &ebpf.MapSpec{
+ Name: MaglevInnerMapName,
+ Type: ebpf.Array,
+ KeySize: uint32(unsafe.Sizeof(uint32(0))),
+ ValueSize: uint32(unsafe.Sizeof(uint32(0))) * tableSize,
+ MaxEntries: 1,
+ }
+}
+
+// NewMaglevOuterMap returns a new object representing a maglev outer map.
+func NewMaglevOuterMap(name string, maxEntries int, tableSize uint32, innerMap *ebpf.MapSpec, pinPath string) (*ebpf.Map, error) {
+ m, err := ebpf.NewMapWithOptions(&ebpf.MapSpec{
+ Name: name,
+ Type: ebpf.HashOfMaps,
+ KeySize: ClusterNameMaxLen,
+ ValueSize: uint32(unsafe.Sizeof(uint32(0))),
+ MaxEntries: uint32(maxEntries),
+ InnerMap: innerMap,
+ Pinning: ebpf.PinByName,
+ }, ebpf.MapOptions{
+ PinPath: pinPath,
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return m, nil
+}
diff --git a/pkg/controller/ads/cache.go b/pkg/controller/ads/cache.go
index c40767a29..434fa5a36 100644
--- a/pkg/controller/ads/cache.go
+++ b/pkg/controller/ads/cache.go
@@ -441,6 +441,7 @@ func newApiRouteAction(action *config_route_v3.RouteAction) *route_v2.RouteActio
RetryPolicy: &route_v2.RetryPolicy{
NumRetries: action.GetRetryPolicy().GetNumRetries().GetValue(),
},
+ HashPolicy: nil,
}
switch action.GetClusterSpecifier().(type) {
@@ -467,6 +468,27 @@ func newApiRouteAction(action *config_route_v3.RouteAction) *route_v2.RouteActio
log.Errorf("newApiRouteAction default, type is %T", action.GetClusterSpecifier())
return nil
}
+ // current only support http header based hash policy
+ apiHashPolicys := make([]*route_v2.RouteAction_HashPolicy,0)
+ for _, hp := range action.GetHashPolicy() {
+ apiHashPolicy := &route_v2.RouteAction_HashPolicy{
+ PolicySpecifier: nil,
+ }
+ switch hp.GetPolicySpecifier().(type) {
+ case *config_route_v3.RouteAction_HashPolicy_Header_:
+ header := &route_v2.RouteAction_HashPolicy_Header{
+ HeaderName: hp.GetHeader().HeaderName,
+ }
+ apiHashPolicy.PolicySpecifier = &route_v2.RouteAction_HashPolicy_Header_{
+ Header: header,
+ }
+ apiHashPolicys = append(apiHashPolicys, apiHashPolicy)
+ default:
+ log.Errorf("newApiRouteAction HashPolicy default, type is %T", hp.GetPolicySpecifier())
+ return nil
+ }
+ }
+ apiAction.HashPolicy = apiHashPolicys
return apiAction
}
diff --git a/pkg/utils/hash/murmur3.go b/pkg/utils/hash/murmur3.go
new file mode 100644
index 000000000..c24a8f788
--- /dev/null
+++ b/pkg/utils/hash/murmur3.go
@@ -0,0 +1,146 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright Authors of Cilium
+
+
+package hash
+
+
+import (
+ "unsafe"
+)
+
+
+// Hash128 calculates a 128 bits hash for the given data. It returns different
+// results when running on big-endian and little-endian machines.
+//
+// The code is intended to be as close as possible to the reference implementation
+// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp.
+func Hash128(data []byte, seed uint32) (uint64, uint64) {
+ const (
+ c1 = uint64(0x87c37b91114253d5)
+ c2 = uint64(0x4cf5ad432745937f)
+ )
+
+
+ nblocks := len(data) / 16
+
+
+ h1 := uint64(seed)
+ h2 := uint64(seed)
+
+
+ for i := 0; i < nblocks; i++ {
+ tmp := (*[2]uint64)(unsafe.Pointer(&data[i*16]))
+ k1 := tmp[0]
+ k2 := tmp[1]
+
+
+ k1 *= c1
+ k1 = rotl64(k1, 31)
+ k1 *= c2
+ h1 ^= k1
+ h1 = rotl64(h1, 27)
+ h1 += h2
+ h1 = h1*5 + 0x52dce729
+ k2 *= c2
+ k2 = rotl64(k2, 33)
+ k2 *= c1
+ h2 ^= k2
+ h2 = rotl64(h2, 31)
+ h2 += h1
+ h2 = h2*5 + 0x38495ab5
+ }
+
+
+ k1 := uint64(0)
+ k2 := uint64(0)
+
+
+ tail := data[nblocks*16:]
+
+
+ switch len(tail) & 15 {
+ case 15:
+ k2 ^= uint64(tail[14]) << 48
+ fallthrough
+ case 14:
+ k2 ^= uint64(tail[13]) << 40
+ fallthrough
+ case 13:
+ k2 ^= uint64(tail[12]) << 32
+ fallthrough
+ case 12:
+ k2 ^= uint64(tail[11]) << 24
+ fallthrough
+ case 11:
+ k2 ^= uint64(tail[10]) << 16
+ fallthrough
+ case 10:
+ k2 ^= uint64(tail[9]) << 8
+ fallthrough
+ case 9:
+ k2 ^= uint64(tail[8]) << 0
+ k2 *= c2
+ k2 = rotl64(k2, 33)
+ k2 *= c1
+ h2 ^= k2
+ fallthrough
+ case 8:
+ k1 ^= uint64(tail[7]) << 56
+ fallthrough
+ case 7:
+ k1 ^= uint64(tail[6]) << 48
+ fallthrough
+ case 6:
+ k1 ^= uint64(tail[5]) << 40
+ fallthrough
+ case 5:
+ k1 ^= uint64(tail[4]) << 32
+ fallthrough
+ case 4:
+ k1 ^= uint64(tail[3]) << 24
+ fallthrough
+ case 3:
+ k1 ^= uint64(tail[2]) << 16
+ fallthrough
+ case 2:
+ k1 ^= uint64(tail[1]) << 8
+ fallthrough
+ case 1:
+ k1 ^= uint64(tail[0]) << 0
+ k1 *= c1
+ k1 = rotl64(k1, 31)
+ k1 *= c2
+ h1 ^= k1
+ }
+
+
+ h1 ^= uint64(len(data))
+ h2 ^= uint64(len(data))
+ h1 += h2
+ h2 += h1
+ h1 = fmix64(h1)
+ h2 = fmix64(h2)
+ h1 += h2
+ h2 += h1
+
+
+ return h1, h2
+}
+
+
+func rotl64(x uint64, r int8) uint64 {
+ return (x << r) | (x >> (64 - r))
+}
+
+
+func fmix64(k uint64) uint64 {
+ k ^= k >> 33
+ k *= 0xff51afd7ed558ccd
+ k ^= k >> 33
+ k *= 0xc4ceb9fe1a85ec53
+ k ^= k >> 33
+
+
+ return k
+}
\ No newline at end of file