-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathd_api.go
130 lines (108 loc) · 3.89 KB
/
d_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"encoding/json"
"fmt"
"github.com/charmbracelet/log"
"net/http"
"strconv"
"github.com/gorilla/mux"
)
var httpLogger = log.WithPrefix("http")
type HttpAPIServer struct {
router *mux.Router
distKV *DistKVServer
httpPort int
}
func NewAPI(distKV *DistKVServer, httpPort int) *HttpAPIServer {
api := &HttpAPIServer{
router: mux.NewRouter(),
distKV: distKV,
httpPort: httpPort,
}
// ::::::::External endpoints below::::::::
// These are quorum endpoints. It is the endpoint that external clients should use.
api.router.HandleFunc("/kv/{key}/{value}", api.setKvHandler).Methods("POST")
api.router.HandleFunc("/kv/{key}", api.getKvHandler).Methods("GET")
// ::::::::Internal endpoints below::::::::
// These are direct store endpoints. It is not quorum based. It is used to avoid recursive forwarding.
// If we were to use quorum endpoint, it will trigger a quorum request to the same node which is not necessary.
// We can modify the message to have like flag to indicate if it is quorum request or not.
// But for simplicity, we are using different endpoints.
api.router.HandleFunc("/store/{key}/{value}", api.setStoreHandler).Methods("POST")
api.router.HandleFunc("/store/{key}", api.getStoreHandler).Methods("GET")
// These are shard endpoints. It is not quorum based.
api.router.HandleFunc("/shards/{key}", api.setShardHandler).Methods("POST")
api.router.HandleFunc("/shards", api.getShardHandler).Methods("GET")
return api
}
func (api *HttpAPIServer) Run() {
addr := fmt.Sprintf(":%d", api.httpPort)
_ = http.ListenAndServe(addr, api.router)
}
func (api *HttpAPIServer) GetAddress() string {
return fmt.Sprintf("%s:%d", GetLocalIP(), api.httpPort)
}
func (api *HttpAPIServer) getKvHandler(w http.ResponseWriter, r *http.Request) {
key := mux.Vars(r)["key"]
routeNodeAddress := api.distKV.ring.ResolveNode(key)
value, err := api.distKV.client.Get(key)
if err != nil {
httpLogger.Errorf("Error forwarding request to %s: %v", routeNodeAddress, err)
http.Error(w, "Error forwarding request", http.StatusInternalServerError)
return
}
_, _ = w.Write([]byte(value))
return
}
func (api *HttpAPIServer) setKvHandler(w http.ResponseWriter, r *http.Request) {
key := mux.Vars(r)["key"]
value := mux.Vars(r)["value"]
routeNodeAddress := api.distKV.ring.ResolveNode(key)
err := api.distKV.client.Put(key, value)
if err != nil {
httpLogger.Errorf("Error forwarding request to %s: %v", routeNodeAddress, err)
http.Error(w, "Error forwarding request", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (api *HttpAPIServer) getStoreHandler(w http.ResponseWriter, r *http.Request) {
key := mux.Vars(r)["key"]
shardId := api.distKV.ring.ResolvePartitionID(key)
value, ok := api.distKV.store.Get(shardId, key)
if !ok {
http.Error(w, "Key not found", http.StatusNotFound)
return
}
_, _ = w.Write([]byte(value))
}
func (api *HttpAPIServer) setStoreHandler(w http.ResponseWriter, r *http.Request) {
key := mux.Vars(r)["key"]
value := mux.Vars(r)["value"]
shardId := api.distKV.ring.ResolvePartitionID(key)
api.distKV.store.Set(shardId, key, value)
w.WriteHeader(http.StatusOK)
}
func (api *HttpAPIServer) getShardHandler(w http.ResponseWriter, r *http.Request) {
shards := api.distKV.store.GetShards()
for shardId, shard := range shards {
httpLogger.Warnf("Shard %d", shardId)
shard.Range(func(key, value interface{}) bool {
httpLogger.Warnf("Key: %s, Value: %s", key, value)
return true
})
}
w.WriteHeader(http.StatusOK)
}
func (api *HttpAPIServer) setShardHandler(w http.ResponseWriter, r *http.Request) {
shardIdStr := mux.Vars(r)["key"]
shardId, _ := strconv.Atoi(shardIdStr)
var shard map[string]string
if err := json.NewDecoder(r.Body).Decode(&shard); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer r.Body.Close()
api.distKV.store.SetShard(shardId, shard)
w.WriteHeader(http.StatusOK)
}