-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathc_hash_ring.go
99 lines (82 loc) · 2.67 KB
/
c_hash_ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
)
type HashRing interface {
AddNode(node string)
RemoveNode(node string)
ResolveNode(key string) string
ResolveNodes(key string, count int) []string
ResolvePartitionID(key string) int
ResolvePartitionOwnerNode(partitionID int) string
ReplicationFactor() int
ResolveNodesForPartition(partitionID int, count int) []string
}
type BoundedLoadConsistentHashRing struct {
ring *consistent.Consistent
replicationFactor int
}
func NewBoundedLoadConsistentHashRing(virtualNodeCount, replicationFactorForEachKey int) HashRing {
cfg := consistent.Config{
PartitionCount: virtualNodeCount, // virtual node count
ReplicationFactor: replicationFactorForEachKey, // number of replicas for each key
Load: 1.25, // server can be 25% > average before the system redistributes
Hasher: hasher{},
}
return &BoundedLoadConsistentHashRing{
ring: consistent.New(nil, cfg),
replicationFactor: replicationFactorForEachKey,
}
}
func (r *BoundedLoadConsistentHashRing) AddNode(node string) {
r.ring.Add(member(node))
}
func (r *BoundedLoadConsistentHashRing) RemoveNode(node string) {
r.ring.Remove(node)
}
func (r *BoundedLoadConsistentHashRing) ResolveNode(key string) string {
return r.ring.LocateKey([]byte(key)).String()
}
// ResolveNodes returns the closest N nodes to the key in the ring.
// TODO: Will be used for replication.
func (r *BoundedLoadConsistentHashRing) ResolveNodes(key string, count int) []string {
members, err := r.ring.GetClosestN([]byte(key), count)
if err != nil {
return nil
}
nodes := make([]string, len(members))
for i, m := range members {
nodes[i] = m.String()
}
return nodes
}
func (r *BoundedLoadConsistentHashRing) ResolvePartitionID(key string) int {
return r.ring.FindPartitionID([]byte(key))
}
func (r *BoundedLoadConsistentHashRing) ResolvePartitionOwnerNode(partitionID int) string {
return r.ring.GetPartitionOwner(partitionID).String()
}
func (r *BoundedLoadConsistentHashRing) ReplicationFactor() int {
return r.replicationFactor
}
func (r *BoundedLoadConsistentHashRing) ResolveNodesForPartition(partitionID int, count int) []string {
members, err := r.ring.GetClosestNForPartition(partitionID, count)
if err != nil {
return nil
}
nodes := make([]string, len(members))
for i, m := range members {
nodes[i] = m.String()
}
return nodes
}
//------------------------ Sub Classes ---------------------------------
type hasher struct{}
func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}
type member string
func (m member) String() string {
return string(m)
}