diff --git a/cmd/dkvsrv/main.go b/cmd/dkvsrv/main.go
index 5e264feb..5044b740 100644
--- a/cmd/dkvsrv/main.go
+++ b/cmd/dkvsrv/main.go
@@ -1,10 +1,14 @@
package main
import (
+ "encoding/json"
"flag"
"fmt"
"github.com/flipkart-incubator/dkv/internal/discovery"
+ "github.com/gorilla/mux"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/ini.v1"
+ "io/ioutil"
"log"
"net"
"net/http"
@@ -49,9 +53,10 @@ var (
vBucket string
// Node level configuration common for all regions in the node
- dbFolder string
- dbListenAddr string
- statsdAddr string
+ dbFolder string
+ dbListenAddr string
+ statsdAddr string
+ httpServerAddr string
// Service discovery related params
discoveryConf string
@@ -71,13 +76,18 @@ var (
nexusLogDirFlag, nexusSnapDirFlag *flag.Flag
- statsCli stats.Client
+ statsCli stats.Client
+ statsPublisher *stats.StatPublisher
+
+ discoveryClient discovery.Client
+ statAggregatorRegistry *stats.StatAggregatorRegistry
)
func init() {
flag.BoolVar(&disklessMode, "diskless", false, fmt.Sprintf("Enables diskless mode where data is stored entirely in memory.\nAvailable on Badger for standalone and slave roles. (default %v)", disklessMode))
flag.StringVar(&dbFolder, "db-folder", "/tmp/dkvsrv", "DB folder path for storing data files")
flag.StringVar(&dbListenAddr, "listen-addr", "0.0.0.0:8080", "Address on which the DKV service binds")
+ flag.StringVar(&httpServerAddr, "http-server-addr", "0.0.0.0:8181", "Address on which the http service binds")
flag.StringVar(&dbEngine, "db-engine", "rocksdb", "Underlying DB engine for storing data - badger|rocksdb")
flag.StringVar(&dbEngineIni, "db-engine-ini", "", "An .ini file for configuring the underlying storage engine. Refer badger.ini or rocks.ini for more details.")
flag.StringVar(&dbRole, "role", "none", "DB role of this node - none|master|slave|discovery")
@@ -119,7 +129,7 @@ func main() {
setupAccessLogger()
setFlagsForNexusDirs()
setupStats()
- printFlagsWithoutPrefix()
+ go setupHttpServer()
if pprofEnable {
go func() {
@@ -142,6 +152,7 @@ func main() {
regionInfo := &serverpb.RegionInfo{
DcID: dcID,
NodeAddress: nodeAddr.Host,
+ HttpAddress: httpServerAddr,
Database: database,
VBucket: vBucket,
Status: serverpb.RegionStatus_INACTIVE,
@@ -149,8 +160,7 @@ func main() {
NexusClusterUrl: nil,
}
- var discoveryClient discovery.Client
- if srvrRole != noRole && srvrRole != discoveryRole {
+ if srvrRole != noRole {
var err error
discoveryClient, err = newDiscoveryClient()
if err != nil {
@@ -417,6 +427,9 @@ func setupStats() {
} else {
statsCli = stats.NewNoOpClient()
}
+ statsPublisher = stats.NewStatPublisher()
+ statAggregatorRegistry = stats.NewStatAggregatorRegistry()
+ go statsPublisher.Run()
}
func newKVStore() (storage.KVStore, storage.ChangePropagator, storage.ChangeApplier, storage.Backupable) {
@@ -567,3 +580,94 @@ func nodeAddress() (*url.URL, error) {
ep := url.URL{Host: fmt.Sprintf("%s:%s", ip, port)}
return &ep, nil
}
+
+func setupHttpServer() {
+ router := mux.NewRouter()
+ router.Handle("/metrics/prometheus", promhttp.Handler())
+ router.HandleFunc("/metrics/json", jsonMetricHandler)
+ router.HandleFunc("/metrics/stream", statsStreamHandler)
+ // Should be enabled only for discovery server ?
+ router.HandleFunc("/metrics/cluster", clusterMetricsHandler)
+ http.Handle("/", router)
+ http.ListenAndServe(httpServerAddr, nil)
+}
+
+func jsonMetricHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ metrics, _ := stats.GetMetrics()
+ json.NewEncoder(w).Encode(metrics)
+}
+
+func statsStreamHandler(w http.ResponseWriter, r *http.Request) {
+ if f, ok := w.(http.Flusher); !ok {
+ http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
+ return
+ } else {
+ // Set the headers related to event streaming.
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
+ w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
+
+ statChannel := make(chan stats.DKVMetrics, 5)
+ channelId := statsPublisher.Register(statChannel)
+ defer func() {
+ ioutil.ReadAll(r.Body)
+ r.Body.Close()
+ }()
+ // Listen to the closing of the http connection via the CloseNotifier
+ notify := w.(http.CloseNotifier).CloseNotify()
+ for {
+ select {
+ case stat := <-statChannel:
+ statJson, _ := json.Marshal(stat)
+ fmt.Fprintf(w, "data: %s\n\n", statJson)
+ f.Flush()
+ case <-notify:
+ statsPublisher.DeRegister(channelId)
+ return
+ }
+ }
+ }
+}
+
+func clusterMetricsHandler(w http.ResponseWriter, r *http.Request) {
+ regions, err := discoveryClient.GetClusterStatus("", "")
+ if err != nil {
+ http.Error(w, "Unable to discover peers!", http.StatusInternalServerError)
+ return
+ }
+ if f, ok := w.(http.Flusher); !ok {
+ http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
+ return
+ } else {
+ // Set the headers related to event streaming.
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
+ w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
+
+ statChannel := make(chan map[string]*stats.DKVMetrics, 5)
+ channelId := statAggregatorRegistry.Register(regions, func(region *serverpb.RegionInfo) string { return region.Database }, statChannel)
+ defer func() {
+ ioutil.ReadAll(r.Body)
+ r.Body.Close()
+ }()
+ // Listen to the closing of the http connection via the CloseNotifier
+ notify := w.(http.CloseNotifier).CloseNotify()
+ for {
+ select {
+ case stat := <-statChannel:
+ statJson, _ := json.Marshal(stat)
+ fmt.Fprintf(w, "data: %s\n\n", statJson)
+ f.Flush()
+ case <-notify:
+ fmt.Println("http request closed")
+ statAggregatorRegistry.DeRegister(channelId)
+ return
+ }
+ }
+ }
+}
diff --git a/go.mod b/go.mod
index 3376b207..9668a5b4 100644
--- a/go.mod
+++ b/go.mod
@@ -15,12 +15,17 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.2 // indirect
+ github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/kpango/fastime v1.0.16
github.com/matttproud/golang_protobuf_extensions v1.0.1
+ github.com/onsi/ginkgo v1.12.0
+ github.com/onsi/gomega v1.9.0
github.com/pkg/errors v0.9.1 // indirect
- github.com/prometheus/client_golang v1.5.1 // indirect
+ github.com/prometheus/client_golang v1.5.1
+ github.com/prometheus/client_model v0.2.0
github.com/prometheus/procfs v0.0.10 // indirect
+ github.com/sirupsen/logrus v1.4.2
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/smira/go-statsd v1.3.1
github.com/vmihailenco/msgpack/v5 v5.3.4
diff --git a/internal/discovery/client.go b/internal/discovery/client.go
index 77e0d6f4..af28f842 100644
--- a/internal/discovery/client.go
+++ b/internal/discovery/client.go
@@ -85,7 +85,6 @@ func getDiscoveryClient(discoveryServiceAddr string) (*grpc.ClientConn, error) {
defer cancel()
return grpc.DialContext(ctx, discoveryServiceAddr,
grpc.WithInsecure(),
- grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithReadBufferSize(readBufSize),
grpc.WithWriteBufferSize(writeBufSize),
@@ -155,6 +154,8 @@ func (m *discoveryClient) pollClusterInfo() error {
}
}
+// gets cluster info for the provided database and vBucket
+// database and vBucket can be empty strings, in which case, the entire cluster set is returned
func (m *discoveryClient) GetClusterStatus(database string, vBucket string) ([]*serverpb.RegionInfo, error) {
if m.clusterInfo == nil {
// When called before cluster info is initialised
@@ -165,8 +166,10 @@ func (m *discoveryClient) GetClusterStatus(database string, vBucket string) ([]*
}
var regions []*serverpb.RegionInfo
for _, region := range m.clusterInfo {
- if region.Database == database && region.VBucket == vBucket {
- regions = append(regions, region)
+ if region.Database == database || database == "" {
+ if region.VBucket == vBucket || vBucket == "" {
+ regions = append(regions, region)
+ }
}
}
return regions, nil
diff --git a/internal/discovery/service.go b/internal/discovery/service.go
index 9ccfb986..4e422f38 100644
--- a/internal/discovery/service.go
+++ b/internal/discovery/service.go
@@ -120,6 +120,7 @@ func (d *discoverService) GetClusterInfo(ctx context.Context, request *serverpb.
continue
}
// Filter inactive regions and regions whose status was updated long time back and hence considered inactive
+ // This simplifies logic on consumers of this API (envoy, slaves) which don't need to filter by status
if hlc.GetTimeAgo(statusUpdate.GetTimestamp()) < d.config.HeartbeatTimeout && statusUpdate.GetRegionInfo().GetStatus() != serverpb.RegionStatus_INACTIVE {
regionsInfo = append(regionsInfo, statusUpdate.GetRegionInfo())
}
diff --git a/internal/master/service.go b/internal/master/service.go
index 6d4aa533..8506b274 100644
--- a/internal/master/service.go
+++ b/internal/master/service.go
@@ -6,10 +6,12 @@ import (
"encoding/gob"
"errors"
"fmt"
+ "github.com/prometheus/client_golang/prometheus"
"io"
"strconv"
"strings"
"sync"
+ "time"
"google.golang.org/protobuf/types/known/emptypb"
@@ -33,6 +35,28 @@ type DKVService interface {
serverpb.DKVBackupRestoreServer
}
+type dkvServiceStat struct {
+ Latency *prometheus.SummaryVec
+ ResponseError *prometheus.CounterVec
+}
+
+func newDKVServiceStat() *dkvServiceStat {
+ RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "dkv",
+ Name: "latency",
+ Help: "Latency statistics for dkv service",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
+ MaxAge: 10 * time.Second,
+ }, []string{"Ops"})
+ ResponseError := prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "dkv",
+ Name: "error",
+ Help: "Error count for storage operations",
+ }, []string{"Ops"})
+ prometheus.MustRegister(RequestLatency, ResponseError)
+ return &dkvServiceStat{RequestLatency, ResponseError}
+}
+
type standaloneService struct {
store storage.KVStore
cp storage.ChangePropagator
@@ -41,6 +65,7 @@ type standaloneService struct {
rwl *sync.RWMutex
lg *zap.Logger
statsCli stats.Client
+ stat *dkvServiceStat
regionInfo *serverpb.RegionInfo
}
@@ -53,10 +78,11 @@ func (ss *standaloneService) GetStatus(ctx context.Context, request *emptypb.Emp
func NewStandaloneService(store storage.KVStore, cp storage.ChangePropagator, br storage.Backupable, lgr *zap.Logger, statsCli stats.Client, regionInfo *serverpb.RegionInfo) DKVService {
rwl := &sync.RWMutex{}
regionInfo.Status = serverpb.RegionStatus_LEADER
- return &standaloneService{store, cp, br, rwl, lgr, statsCli, regionInfo}
+ return &standaloneService{store, cp, br, rwl, lgr, statsCli, newDKVServiceStat(), regionInfo}
}
func (ss *standaloneService) Put(ctx context.Context, putReq *serverpb.PutRequest) (*serverpb.PutResponse, error) {
+ defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Put), time.Now())
ss.rwl.RLock()
defer ss.rwl.RUnlock()
if err := ss.store.Put(&serverpb.KVPair{Key: putReq.Key, Value: putReq.Value, ExpireTS: putReq.ExpireTS}); err != nil {
@@ -82,6 +108,7 @@ func (ss *standaloneService) MultiPut(ctx context.Context, putReq *serverpb.Mult
}
func (ss *standaloneService) Delete(ctx context.Context, delReq *serverpb.DeleteRequest) (*serverpb.DeleteResponse, error) {
+ defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Delete), time.Now())
ss.rwl.RLock()
defer ss.rwl.RUnlock()
@@ -93,9 +120,9 @@ func (ss *standaloneService) Delete(ctx context.Context, delReq *serverpb.Delete
}
func (ss *standaloneService) Get(ctx context.Context, getReq *serverpb.GetRequest) (*serverpb.GetResponse, error) {
+ defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Get+getReadConsistencySuffix(getReq.ReadConsistency)), time.Now())
ss.rwl.RLock()
defer ss.rwl.RUnlock()
-
readResults, err := ss.store.Get(getReq.Key)
res := &serverpb.GetResponse{Status: newEmptyStatus()}
if err != nil {
@@ -112,6 +139,7 @@ func (ss *standaloneService) Get(ctx context.Context, getReq *serverpb.GetReques
}
func (ss *standaloneService) MultiGet(ctx context.Context, multiGetReq *serverpb.MultiGetRequest) (*serverpb.MultiGetResponse, error) {
+ defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.MultiGet+getReadConsistencySuffix(multiGetReq.ReadConsistency)), time.Now())
ss.rwl.RLock()
defer ss.rwl.RUnlock()
@@ -127,6 +155,7 @@ func (ss *standaloneService) MultiGet(ctx context.Context, multiGetReq *serverpb
}
func (ss *standaloneService) CompareAndSet(ctx context.Context, casReq *serverpb.CompareAndSetRequest) (*serverpb.CompareAndSetResponse, error) {
+ defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.CompareAndSet), time.Now())
ss.rwl.RLock()
defer ss.rwl.RUnlock()
@@ -283,6 +312,7 @@ func (ss *standaloneService) Restore(ctx context.Context, restoreReq *serverpb.R
}
func (ss *standaloneService) Iterate(iterReq *serverpb.IterateRequest, dkvIterSrvr serverpb.DKV_IterateServer) error {
+ defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Iterate), time.Now())
ss.rwl.RLock()
defer ss.rwl.RUnlock()
@@ -320,6 +350,7 @@ type distributedService struct {
raftRepl nexus_api.RaftReplicator
lg *zap.Logger
statsCli stats.Client
+ stat *dkvServiceStat
isClosed bool
}
@@ -327,10 +358,11 @@ type distributedService struct {
// that attempts to replicate data across multiple replicas over Nexus.
func NewDistributedService(kvs storage.KVStore, cp storage.ChangePropagator, br storage.Backupable,
raftRepl nexus_api.RaftReplicator, lgr *zap.Logger, statsCli stats.Client, regionInfo *serverpb.RegionInfo) DKVClusterService {
- return &distributedService{NewStandaloneService(kvs, cp, br, lgr, statsCli, regionInfo), raftRepl, lgr, statsCli, false}
+ return &distributedService{NewStandaloneService(kvs, cp, br, lgr, statsCli, regionInfo), raftRepl, lgr, statsCli, newDKVServiceStat(), false}
}
func (ds *distributedService) Put(ctx context.Context, putReq *serverpb.PutRequest) (*serverpb.PutResponse, error) {
+ defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Put), time.Now())
reqBts, err := proto.Marshal(&raftpb.InternalRaftRequest{Put: putReq})
res := &serverpb.PutResponse{Status: newEmptyStatus()}
if err != nil {
@@ -361,6 +393,7 @@ func (ds *distributedService) MultiPut(ctx context.Context, multiPutReq *serverp
}
func (ds *distributedService) CompareAndSet(ctx context.Context, casReq *serverpb.CompareAndSetRequest) (*serverpb.CompareAndSetResponse, error) {
+ defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.CompareAndSet), time.Now())
reqBts, _ := proto.Marshal(&raftpb.InternalRaftRequest{Cas: casReq})
res := &serverpb.CompareAndSetResponse{Status: newEmptyStatus()}
casRes, err := ds.raftRepl.Save(ctx, reqBts)
@@ -375,6 +408,7 @@ func (ds *distributedService) CompareAndSet(ctx context.Context, casReq *serverp
}
func (ds *distributedService) Delete(ctx context.Context, delReq *serverpb.DeleteRequest) (*serverpb.DeleteResponse, error) {
+ defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Delete), time.Now())
reqBts, err := proto.Marshal(&raftpb.InternalRaftRequest{Delete: delReq})
res := &serverpb.DeleteResponse{Status: newEmptyStatus()}
if err != nil {
@@ -390,6 +424,7 @@ func (ds *distributedService) Delete(ctx context.Context, delReq *serverpb.Delet
}
func (ds *distributedService) Get(ctx context.Context, getReq *serverpb.GetRequest) (*serverpb.GetResponse, error) {
+ defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Get+getReadConsistencySuffix(getReq.GetReadConsistency())), time.Now())
switch getReq.ReadConsistency {
case serverpb.ReadConsistency_SEQUENTIAL:
return ds.DKVService.Get(ctx, getReq)
@@ -417,6 +452,7 @@ func (ds *distributedService) Get(ctx context.Context, getReq *serverpb.GetReque
}
func (ds *distributedService) MultiGet(ctx context.Context, multiGetReq *serverpb.MultiGetRequest) (*serverpb.MultiGetResponse, error) {
+ defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.MultiGet+getReadConsistencySuffix(multiGetReq.GetReadConsistency())), time.Now())
switch multiGetReq.ReadConsistency {
case serverpb.ReadConsistency_SEQUENTIAL:
return ds.DKVService.MultiGet(ctx, multiGetReq)
@@ -447,6 +483,7 @@ func gobDecodeAsKVPairs(val []byte) ([]*serverpb.KVPair, error) {
}
func (ds *distributedService) Iterate(iterReq *serverpb.IterateRequest, dkvIterSrvr serverpb.DKV_IterateServer) error {
+ defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Iterate), time.Now())
return ds.DKVService.Iterate(iterReq, dkvIterSrvr)
}
@@ -522,3 +559,13 @@ func newErrorStatus(err error) *serverpb.Status {
func newEmptyStatus() *serverpb.Status {
return &serverpb.Status{Code: 0, Message: ""}
}
+
+func getReadConsistencySuffix(rc serverpb.ReadConsistency) string {
+ switch rc {
+ case serverpb.ReadConsistency_SEQUENTIAL:
+ return "Seq"
+ case serverpb.ReadConsistency_LINEARIZABLE:
+ return "Lin"
+ }
+ return ""
+}
diff --git a/internal/slave/service.go b/internal/slave/service.go
index 941d94d8..9d965936 100644
--- a/internal/slave/service.go
+++ b/internal/slave/service.go
@@ -10,6 +10,7 @@ import (
"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/flipkart-incubator/dkv/pkg/ctl"
"github.com/flipkart-incubator/dkv/pkg/serverpb"
+ "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/emptypb"
"io"
@@ -61,11 +62,27 @@ type slaveService struct {
ca storage.ChangeApplier
lg *zap.Logger
statsCli stats.Client
+ stat *stat
regionInfo *serverpb.RegionInfo
clusterInfo discovery.ClusterInfoGetter
isClosed bool
replInfo *replInfo
}
+type stat struct {
+ ReplicationLag prometheus.Gauge
+}
+
+func newStat() *stat {
+ repliacationLag := prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "slave",
+ Name: "replication_lag",
+ Help: "replication lag of the slave",
+ })
+ prometheus.MustRegister(repliacationLag)
+ return &stat{
+ ReplicationLag: repliacationLag,
+ }
+}
// NewService creates a slave DKVService that periodically polls
// for changes from master node and replicates them onto its local
@@ -82,7 +99,7 @@ func NewService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger
func newSlaveService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger,
statsCli stats.Client, info *serverpb.RegionInfo, replConf *ReplicationConfig, clusterInfo discovery.ClusterInfoGetter) *slaveService {
ri := &replInfo{replConfig: replConf}
- ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli,
+ ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli, stat: newStat(),
regionInfo: info, replInfo: ri, clusterInfo: clusterInfo}
ss.findAndConnectToMaster()
ss.startReplication()
@@ -171,6 +188,7 @@ func (ss *slaveService) pollAndApplyChanges() {
case <-ss.replInfo.replTckr.C:
ss.lg.Info("Current replication lag", zap.Uint64("ReplicationLag", ss.replInfo.replLag))
ss.statsCli.Gauge("replication.lag", int64(ss.replInfo.replLag))
+ ss.stat.ReplicationLag.Set(float64(ss.replInfo.replLag))
if err := ss.applyChangesFromMaster(ss.replInfo.replConfig.MaxNumChngs); err != nil {
ss.lg.Error("Unable to retrieve changes from master", zap.Error(err))
if err := ss.replaceMasterIfInactive(); err != nil {
diff --git a/internal/stats/models.go b/internal/stats/models.go
new file mode 100644
index 00000000..a04668e1
--- /dev/null
+++ b/internal/stats/models.go
@@ -0,0 +1,113 @@
+package stats
+
+import (
+ dto "github.com/prometheus/client_model/go"
+ "time"
+)
+
+const (
+ Ops = "ops"
+ Put = "put"
+ PutTTL = "pTtl"
+ Get = "get"
+ MultiGet = "mget"
+ Delete = "del"
+ GetSnapShot = "getSnapShot"
+ PutSnapShot = "putSnapShot"
+ Iterate = "iter"
+ CompareAndSet = "cas"
+ LoadChange = "loadChange"
+ SaveChange = "saveChange"
+)
+
+type DKVMetrics struct {
+ TimeStamp int64 `json:"ts"`
+ StoreLatency map[string]*Percentile `json:"storage_latency"`
+ NexusLatency map[string]*Percentile `json:"nexus_latency"`
+ DKVLatency map[string]*Percentile `json:"dkv_latency"`
+ StorageOpsCount map[string]uint64 `json:"storage_ops_count"`
+ StorageOpsErrorCount map[string]float64 `json:"storage_ops_error_count"`
+ NexusOpsCount map[string]uint64 `json:"nexus_ops_count"`
+ DKVReqCount map[string]uint64 `json:"dkv_req_count"`
+ Count float64 `json:"count"`
+}
+
+func (dm *DKVMetrics) Merge(dm1 DKVMetrics) {
+ dm.StoreLatency = MergeMapPercentile(dm.StoreLatency, dm1.StoreLatency, dm.Count)
+ dm.NexusLatency = MergeMapPercentile(dm.NexusLatency, dm1.NexusLatency, dm.Count)
+ dm.DKVLatency = MergeMapPercentile(dm.DKVLatency, dm1.DKVLatency, dm.Count)
+ dm.StorageOpsCount = MergeMapUint64(dm.StorageOpsCount, dm1.StorageOpsCount)
+ dm.StorageOpsErrorCount = MergeMapFloat64(dm.StorageOpsErrorCount, dm.StorageOpsErrorCount)
+ dm.NexusOpsCount = MergeMapUint64(dm.NexusOpsCount, dm1.NexusOpsCount)
+ dm.DKVReqCount = MergeMapUint64(dm.DKVReqCount, dm1.DKVReqCount)
+ dm.Count = dm.Count + 1
+}
+
+func MergeMapUint64(m1, m2 map[string]uint64) map[string]uint64 {
+ for k, v := range m2 {
+ if _, exist := m1[k]; exist {
+ m1[k] = m1[k] + v
+ } else {
+ m1[k] = v
+ }
+ }
+ return m1
+}
+
+func MergeMapFloat64(m1, m2 map[string]float64) map[string]float64 {
+ for k, v := range m2 {
+ if _, exist := m1[k]; exist {
+ m1[k] = m1[k] + v
+ } else {
+ m1[k] = v
+ }
+ }
+ return m1
+}
+
+func MergeMapPercentile(m1, m2 map[string]*Percentile, count float64) map[string]*Percentile {
+ for k, v := range m2 {
+ if _, exist := m1[k]; exist {
+ m1[k].P50 = (m1[k].P50*count + m2[k].P50) / (count + 1)
+ m1[k].P90 = (m1[k].P90*count + m2[k].P90) / (count + 1)
+ m1[k].P99 = (m1[k].P99*count + m2[k].P99) / (count + 1)
+ } else {
+ m1[k] = v
+ }
+ }
+ return m1
+}
+
+type Percentile struct {
+ P50 float64 `json:"p50"`
+ P90 float64 `json:"p90"`
+ P99 float64 `json:"p99"`
+}
+
+func newDKVMetric() *DKVMetrics {
+ return &DKVMetrics{
+ TimeStamp: time.Now().Unix(),
+ StoreLatency: make(map[string]*Percentile),
+ NexusLatency: make(map[string]*Percentile),
+ DKVLatency: make(map[string]*Percentile),
+ StorageOpsCount: make(map[string]uint64),
+ StorageOpsErrorCount: make(map[string]float64),
+ NexusOpsCount: make(map[string]uint64),
+ DKVReqCount: make(map[string]uint64),
+ Count: 1,
+ }
+}
+func newPercentile(quantile []*dto.Quantile) *Percentile {
+ percentile := &Percentile{}
+ for _, q := range quantile {
+ switch *q.Quantile {
+ case 0.5:
+ percentile.P50 = *q.Value
+ case 0.9:
+ percentile.P90 = *q.Value
+ case 0.99:
+ percentile.P99 = *q.Value
+ }
+ }
+ return percentile
+}
diff --git a/internal/stats/prometheus.go b/internal/stats/prometheus.go
new file mode 100644
index 00000000..12b97720
--- /dev/null
+++ b/internal/stats/prometheus.go
@@ -0,0 +1,42 @@
+package stats
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "time"
+)
+
+func MeasureLatency(observer prometheus.Observer, startTime time.Time) {
+ observer.Observe(time.Since(startTime).Seconds())
+}
+
+func GetMetrics() (*DKVMetrics, error) {
+ dkvMetrics := newDKVMetric()
+ mfs, err := prometheus.DefaultGatherer.Gather()
+ if err != nil {
+ return dkvMetrics, err
+ }
+ for _, mf := range mfs {
+ switch mf.GetName() {
+ case "storage_latency":
+ for _, m := range mf.GetMetric() {
+ dkvMetrics.StoreLatency[m.Label[0].GetValue()] = newPercentile(m.GetSummary().GetQuantile())
+ dkvMetrics.StorageOpsCount[m.Label[0].GetValue()] = m.GetSummary().GetSampleCount()
+ }
+ case "nexus_latency":
+ for _, m := range mf.GetMetric() {
+ dkvMetrics.NexusLatency[m.Label[0].GetValue()] = newPercentile(m.GetSummary().GetQuantile())
+ dkvMetrics.NexusOpsCount[m.Label[0].GetValue()] = m.GetSummary().GetSampleCount()
+ }
+ case "dkv_latency":
+ for _, m := range mf.GetMetric() {
+ dkvMetrics.DKVLatency[m.Label[0].GetValue()] = newPercentile(m.GetSummary().GetQuantile())
+ dkvMetrics.DKVReqCount[m.Label[0].GetValue()] = m.GetSummary().GetSampleCount()
+ }
+ case "storage_error":
+ for _, m := range mf.GetMetric() {
+ dkvMetrics.StorageOpsErrorCount[m.Label[0].GetValue()] = m.GetCounter().GetValue()
+ }
+ }
+ }
+ return dkvMetrics, nil
+}
diff --git a/internal/stats/stat_aggregator.go b/internal/stats/stat_aggregator.go
new file mode 100644
index 00000000..1b014d40
--- /dev/null
+++ b/internal/stats/stat_aggregator.go
@@ -0,0 +1,176 @@
+package stats
+
+import (
+ "context"
+ "fmt"
+ "github.com/flipkart-incubator/dkv/pkg/serverpb"
+ "sync"
+ "time"
+)
+
+var (
+ MAX_STAT_BUFFER = 5
+)
+
+type StatAggregatorRegistry struct {
+ statsListener *StatListener
+ statAggregatorMap map[int64]*StatAggregator
+ /* Mutex for Safe Access */
+ mapMutex sync.Mutex
+}
+
+type MetricTag interface {
+ GetTag(region *serverpb.RegionInfo) string
+}
+
+func NewStatAggregatorRegistry() *StatAggregatorRegistry {
+ statsListener := NewStatListener()
+ return &StatAggregatorRegistry{statsListener: statsListener, statAggregatorMap: make(map[int64]*StatAggregator)}
+}
+
+func (sr *StatAggregatorRegistry) Register(regions []*serverpb.RegionInfo, tagger func(*serverpb.RegionInfo) string, outputChannel chan map[string]*DKVMetrics) int64 {
+ hostMap := map[string]string{}
+ for _, region := range regions {
+ hostMap[region.GetHttpAddress()] = tagger(region)
+ }
+ id := time.Now().UnixNano()
+ statAggregator := NewStatAggregator(outputChannel, hostMap)
+ go statAggregator.Start(sr.statsListener)
+
+ sr.mapMutex.Lock()
+ defer sr.mapMutex.Unlock()
+
+ sr.statAggregatorMap[id] = statAggregator
+
+ return id
+}
+
+func (sr *StatAggregatorRegistry) DeRegister(id int64) {
+ sr.mapMutex.Lock()
+ defer sr.mapMutex.Unlock()
+ if statAggregator, exist := sr.statAggregatorMap[id]; exist {
+ statAggregator.Stop()
+ delete(sr.statAggregatorMap, id)
+ }
+}
+
+type StatAggregator struct {
+ outputChannel chan map[string]*DKVMetrics
+ aggregatedStatMap map[int64]map[string]*DKVMetrics
+ hostMap map[string]string
+ channelIds map[string]int64
+ ctx context.Context
+ cancelFunc context.CancelFunc
+}
+
+func NewStatAggregator(outputChannel chan map[string]*DKVMetrics, hostMap map[string]string) *StatAggregator {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ return &StatAggregator{outputChannel: outputChannel, hostMap: hostMap, channelIds: map[string]int64{}, ctx: ctx, cancelFunc: cancelFunc}
+}
+
+func (sa *StatAggregator) Start(listener *StatListener) {
+ sa.aggregatedStatMap = make(map[int64]map[string]*DKVMetrics, MAX_STAT_BUFFER)
+
+ channels := make([]chan MetricEvent, 2)
+ for host, _ := range sa.hostMap {
+ channel := make(chan MetricEvent, MAX_STAT_BUFFER)
+ channelId, _ := listener.Register(host, channel)
+ sa.channelIds[host] = channelId
+ channels = append(channels, channel)
+ }
+ aggregatedEventChannel := sa.getMultiplexedChannel(channels)
+ for {
+ select {
+ case event := <-aggregatedEventChannel:
+ tag := sa.hostMap[event.host]
+ metric := event.metric
+
+ /* ensuring that we have upper buffer size of 5 sec */
+ if _, exist := sa.aggregatedStatMap[metric.TimeStamp]; !exist {
+ if len(sa.aggregatedStatMap) >= MAX_STAT_BUFFER {
+ index := getMinIndex(sa.aggregatedStatMap)
+ populateConsolidatedStat(sa.aggregatedStatMap[index])
+ sa.outputChannel <- sa.aggregatedStatMap[index]
+ delete(sa.aggregatedStatMap, index)
+ }
+ sa.aggregatedStatMap[metric.TimeStamp] = make(map[string]*DKVMetrics)
+ }
+
+ /* merging metrics*/
+ if _, exist := sa.aggregatedStatMap[metric.TimeStamp][tag]; !exist {
+ metric.Count = 1
+ sa.aggregatedStatMap[metric.TimeStamp][tag] = &metric
+ } else {
+ sa.aggregatedStatMap[metric.TimeStamp][tag].Merge(metric)
+
+ }
+
+ /* flushing when all metrics are aggregated */
+ if getStatCount(sa.aggregatedStatMap[metric.TimeStamp]) == len(sa.hostMap) {
+ populateConsolidatedStat(sa.aggregatedStatMap[metric.TimeStamp])
+ sa.outputChannel <- sa.aggregatedStatMap[metric.TimeStamp]
+ delete(sa.aggregatedStatMap, metric.TimeStamp)
+ }
+
+ case <-sa.ctx.Done():
+ for host, channelId := range sa.channelIds {
+ listener.DeRegister(host, channelId)
+ }
+ close(sa.outputChannel)
+ return
+ }
+ }
+}
+
+func (sa *StatAggregator) Stop() {
+ sa.cancelFunc()
+}
+
+func getStatCount(metricMap map[string]*DKVMetrics) int {
+ count := 0
+ for _, metric := range metricMap {
+ count = count + int(metric.Count)
+ }
+ return count
+}
+
+func getMinIndex(m map[int64]map[string]*DKVMetrics) int64 {
+ var min int64
+ for index, _ := range m {
+ if min == 0 || min > index {
+ min = index
+ }
+ }
+ return min
+}
+
+func populateConsolidatedStat(m map[string]*DKVMetrics) {
+ dkvMetrics := newDKVMetric()
+ for _, dm := range m {
+ /* it should contain ts of the metric that it is combining */
+ dkvMetrics.TimeStamp = dm.TimeStamp
+ dkvMetrics.Merge(*dm)
+ }
+ m["global"] = dkvMetrics
+}
+func (sa *StatAggregator) getMultiplexedChannel(channels []chan MetricEvent) chan MetricEvent {
+ /* Channel to Write Multiplexed Events */
+ aggregatedSseEvents := make(chan MetricEvent, MAX_STAT_BUFFER)
+
+ /* Start all Multiplexing Go Routines with Context */
+ for _, channel := range channels {
+ go func(evntChan chan MetricEvent) {
+ for {
+ select {
+ case <-sa.ctx.Done():
+ fmt.Println("Context Signal Received Exiting Multiplexer Routine")
+ return
+ case event := <-evntChan:
+ /* Write received event onto aggregated channel */
+ aggregatedSseEvents <- event
+ }
+ }
+ }(channel)
+ }
+ return aggregatedSseEvents
+}
diff --git a/internal/stats/stat_listener.go b/internal/stats/stat_listener.go
new file mode 100644
index 00000000..77d76764
--- /dev/null
+++ b/internal/stats/stat_listener.go
@@ -0,0 +1,181 @@
+package stats
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+)
+
+type StatListener struct {
+ /* Time Stamp to Channel Map for Writing Output */
+ listenerInfoMap map[string]*ListenerInfo
+ /* Mutex for Safe Access */
+ mapMutex sync.Mutex
+}
+
+type MetricEvent struct {
+ metric DKVMetrics
+ host string
+}
+
+func NewStatListener() *StatListener {
+ return &StatListener{listenerInfoMap: make(map[string]*ListenerInfo)}
+}
+
+func (sl *StatListener) Register(host string, outputChannel chan MetricEvent) (int64, error) {
+ sl.mapMutex.Lock()
+ defer sl.mapMutex.Unlock()
+ if _, exists := sl.listenerInfoMap[host]; !exists {
+ newListenerInfo := NewListenerInfo(host)
+ if err := newListenerInfo.Start(); err != nil {
+ return 0, err
+ }
+ sl.listenerInfoMap[host] = newListenerInfo
+ }
+ return sl.listenerInfoMap[host].Register(outputChannel), nil
+}
+
+func (sl *StatListener) DeRegister(host string, id int64) {
+ sl.mapMutex.Lock()
+ if li, exists := sl.listenerInfoMap[host]; exists {
+ li.DeRegister(id)
+ if li.GetChannelCount() == 0 {
+ li.Stop()
+ delete(sl.listenerInfoMap, host)
+ }
+ }
+ defer sl.mapMutex.Unlock()
+}
+
+type ListenerInfo struct {
+ host string
+ outputChannelMap map[int64]chan MetricEvent
+ inputChannel <-chan MetricEvent
+ mapMutex sync.Mutex
+ ctx context.Context
+ cancelFunc context.CancelFunc
+}
+
+func NewListenerInfo(host string) *ListenerInfo {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ return &ListenerInfo{host: host, outputChannelMap: make(map[int64]chan MetricEvent, 2), inputChannel: make(<-chan MetricEvent, 5), ctx: ctx, cancelFunc: cancelFunc}
+}
+
+func (li *ListenerInfo) GetChannelCount() int {
+ return len(li.outputChannelMap)
+}
+
+func (li *ListenerInfo) Start() error {
+ var err error
+ if li.inputChannel, err = getStreamChannel(li.host, li.ctx); err == nil {
+ go li.BroadCast()
+ }
+ return err
+}
+
+func (li *ListenerInfo) BroadCast() {
+ for {
+ select {
+ case e := <-li.inputChannel:
+ li.mapMutex.Lock()
+ for _, outputChan := range li.outputChannelMap {
+ outputChan <- e
+ }
+ li.mapMutex.Unlock()
+ case <-li.ctx.Done():
+ li.mapMutex.Lock()
+ for _, outputChan := range li.outputChannelMap {
+ close(outputChan)
+ }
+ li.mapMutex.Unlock()
+ }
+ }
+}
+
+func getStreamChannel(host string, ctx context.Context) (<-chan MetricEvent, error) {
+
+ client := &http.Client{}
+ transport := &http.Transport{}
+ transport.DisableCompression = true
+ client.Transport = transport
+ request, err := http.NewRequest("GET", "http://"+host+"/metrics/stream", nil)
+ if err != nil {
+ return nil, err
+ }
+ /* Add Header to accept streaming events */
+ request.Header.Set("Accept", "text/event-stream")
+ /* Make Channel to Report Events */
+ eventChannel := make(chan MetricEvent, 5)
+ /* Ensure Request gets Cancelled along with Context */
+ requestWithContext := request.WithContext(ctx)
+ /* Fire Request */
+ if response, err := client.Do(requestWithContext); err == nil {
+ /* Open a Reader on Response Body */
+ go parseEvent(response, eventChannel, host, ctx)
+ } else {
+ return nil, err
+ }
+ return eventChannel, nil
+}
+
+func parseEvent(response *http.Response, eventChannel chan MetricEvent, host string, ctx context.Context) {
+ defer response.Body.Close()
+ br := bufio.NewReader(response.Body)
+ for {
+ select {
+ case <-ctx.Done():
+ close(eventChannel)
+ return
+ default:
+ /* Read Lines Upto Delimiter */
+ if readBytes, err := br.ReadBytes('\n'); err == nil {
+ if event, err := buildEvent(readBytes); err == nil {
+ eventChannel <- MetricEvent{metric: *event, host: host}
+ }
+ }
+ }
+ }
+}
+
+func buildEvent(byts []byte) (*DKVMetrics, error) {
+ splits := bytes.Split(byts, []byte{':', ' '})
+ if len(splits) == 2 {
+ dkvMetrics := &DKVMetrics{}
+ err := json.Unmarshal(splits[1], dkvMetrics)
+ return dkvMetrics, err
+ }
+ return nil, errors.New("Invalid Response")
+}
+
+func (li *ListenerInfo) Stop() {
+ li.cancelFunc()
+}
+
+func (li *ListenerInfo) Register(outputChannel chan MetricEvent) int64 {
+ channelId := time.Now().UnixNano()
+ li.mapMutex.Lock()
+ li.outputChannelMap[channelId] = outputChannel
+ li.mapMutex.Unlock()
+ return channelId
+}
+
+func (li *ListenerInfo) DeRegister(id int64) {
+ li.mapMutex.Lock()
+ if outputChannel, ok := li.outputChannelMap[id]; ok {
+ li.unsafeDeregister(outputChannel, id)
+ }
+ li.mapMutex.Unlock()
+}
+
+func (li *ListenerInfo) unsafeDeregister(outputChannel chan MetricEvent, id int64) {
+ /* Close Channel */
+ close(outputChannel)
+ /* Delete current Channel from Broadcast Map */
+ delete(li.outputChannelMap, id)
+}
diff --git a/internal/stats/stat_publisher.go b/internal/stats/stat_publisher.go
new file mode 100644
index 00000000..cba2ca28
--- /dev/null
+++ b/internal/stats/stat_publisher.go
@@ -0,0 +1,61 @@
+package stats
+
+import (
+ "sync"
+ "time"
+)
+
+type StatPublisher struct {
+ /* Time Stamp to Channel Map for Writing Output */
+ outputChannelMap map[int64]chan DKVMetrics
+ /* Mutex for Safe Access */
+ mapMutex sync.Mutex
+}
+
+func NewStatPublisher() *StatPublisher {
+ return &StatPublisher{
+ outputChannelMap: make(map[int64]chan DKVMetrics, 10),
+ }
+}
+
+func (sp *StatPublisher) Register(outputChannel chan DKVMetrics) int64 {
+ channelId := time.Now().UnixNano()
+ sp.mapMutex.Lock()
+ sp.outputChannelMap[channelId] = outputChannel
+ sp.mapMutex.Unlock()
+ return channelId
+}
+
+func (sp *StatPublisher) DeRegister(id int64) {
+ sp.mapMutex.Lock()
+ if outputChannel, ok := sp.outputChannelMap[id]; ok {
+ sp.unsafeDeregister(outputChannel, id)
+ }
+ sp.mapMutex.Unlock()
+}
+
+func (sp *StatPublisher) unsafeDeregister(outputChannel chan DKVMetrics, id int64) {
+ /* Close Channel */
+ close(outputChannel)
+ /* Delete current Channel from Broadcast Map */
+ delete(sp.outputChannelMap, id)
+}
+
+func (sp *StatPublisher) Run() {
+ ticker := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ dkvMetrics, _ := GetMetrics()
+ sp.mapMutex.Lock()
+ for id, outputChannel := range sp.outputChannelMap {
+ select {
+ case outputChannel <- *dkvMetrics:
+ default:
+ sp.unsafeDeregister(outputChannel, id)
+ }
+ }
+ sp.mapMutex.Unlock()
+ }
+ }
+}
diff --git a/internal/storage/badger/store.go b/internal/storage/badger/store.go
index 6337a80e..90fa64ff 100644
--- a/internal/storage/badger/store.go
+++ b/internal/storage/badger/store.go
@@ -39,6 +39,7 @@ type DB interface {
type badgerDB struct {
db *badger.DB
opts *bdgrOpts
+ stat *storage.Stat
// Indicates a global mutation like backup and restore that
// require exclusivity. Shall be manipulated using atomics.
@@ -181,7 +182,7 @@ func openStore(bdbOpts *bdgrOpts) (*badgerDB, error) {
if err != nil {
return nil, err
}
- return &badgerDB{db, bdbOpts, 0}, nil
+ return &badgerDB{db, bdbOpts, storage.NewStat(), 0}, nil
}
func (bdb *badgerDB) Close() error {
@@ -190,6 +191,7 @@ func (bdb *badgerDB) Close() error {
}
func (bdb *badgerDB) Put(pairs ...*serverpb.KVPair) error {
+ /* todo stat computation */
metricsPrefix := "badger.put.multi"
if len(pairs) == 1 {
metricsPrefix = "badger.put.single"
@@ -220,17 +222,22 @@ func (bdb *badgerDB) Put(pairs ...*serverpb.KVPair) error {
func (bdb *badgerDB) Delete(key []byte) error {
defer bdb.opts.statsCli.Timing("badger.delete.latency.ms", time.Now())
+ defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.Delete), time.Now())
+
err := bdb.db.Update(func(txn *badger.Txn) error {
return txn.Delete(key)
})
if err != nil {
bdb.opts.statsCli.Incr("badger.delete.errors", 1)
+ bdb.stat.ResponseError.WithLabelValues(stats.Delete).Inc()
}
return err
}
func (bdb *badgerDB) Get(keys ...[]byte) ([]*serverpb.KVPair, error) {
defer bdb.opts.statsCli.Timing("badger.get.latency.ms", time.Now())
+ defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.Get), time.Now())
+
var results []*serverpb.KVPair
err := bdb.db.View(func(txn *badger.Txn) error {
for _, key := range keys {
@@ -249,12 +256,15 @@ func (bdb *badgerDB) Get(keys ...[]byte) ([]*serverpb.KVPair, error) {
})
if err != nil {
bdb.opts.statsCli.Incr("badger.get.errors", 1)
+ bdb.stat.ResponseError.WithLabelValues(stats.Get).Inc()
}
return results, err
}
func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) {
defer bdb.opts.statsCli.Timing("badger.cas.latency.ms", time.Now())
+ defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.CompareAndSet), time.Now())
+
casTrxn := bdb.db.NewTransaction(true)
defer casTrxn.Discard()
@@ -266,6 +276,7 @@ func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) {
}
case err != nil:
bdb.opts.statsCli.Incr("badger.cas.get.errors", 1)
+ bdb.stat.ResponseError.WithLabelValues(stats.CompareAndSet).Inc()
return false, err
default:
existVal, _ := exist.ValueCopy(nil)
@@ -276,6 +287,7 @@ func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) {
err = casTrxn.Set(key, update)
if err != nil {
bdb.opts.statsCli.Incr("badger.cas.set.errors", 1)
+ bdb.stat.ResponseError.WithLabelValues(stats.CompareAndSet).Inc()
return false, err
}
err = casTrxn.Commit()
@@ -291,6 +303,7 @@ const (
func (bdb *badgerDB) GetSnapshot() (io.ReadCloser, error) {
defer bdb.opts.statsCli.Timing("badger.snapshot.get.latency.ms", time.Now())
+ defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.GetSnapShot), time.Now())
sstFile, err := storage.CreateTempFile(bdb.opts.sstDirectory, badgerSSTPrefix)
if err != nil {
@@ -324,6 +337,7 @@ func (bdb *badgerDB) GetSnapshot() (io.ReadCloser, error) {
func (bdb *badgerDB) PutSnapshot(snap io.ReadCloser) error {
defer bdb.opts.statsCli.Timing("badger.snapshot.put.latency.ms", time.Now())
+ defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.PutSnapShot), time.Now())
wb := bdb.db.NewWriteBatch()
defer wb.Cancel()
@@ -482,6 +496,8 @@ func (bdb *badgerDB) GetLatestAppliedChangeNumber() (uint64, error) {
func (bdb *badgerDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error) {
defer bdb.opts.statsCli.Timing("badger.save.changes.latency.ms", time.Now())
+ defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.SaveChange), time.Now())
+
var appldChngNum uint64
var lastErr error
diff --git a/internal/storage/rocksdb/store.go b/internal/storage/rocksdb/store.go
index 8dfb2b34..2365ba18 100644
--- a/internal/storage/rocksdb/store.go
+++ b/internal/storage/rocksdb/store.go
@@ -41,6 +41,7 @@ type rocksDB struct {
ttlCF *gorocksdb.ColumnFamilyHandle
optimTrxnDB *gorocksdb.OptimisticTransactionDB
opts *rocksDBOpts
+ stat *storage.Stat
// Indicates a global mutation like backup and restore that
// require exclusivity. Shall be manipulated using atomics.
@@ -221,6 +222,7 @@ func openStore(opts *rocksDBOpts) (*rocksDB, error) {
optimTrxnDB: optimTrxnDB,
opts: opts,
globalMutation: 0,
+ stat: storage.NewStat(),
}
//TODO: revisit this later after understanding what is the impact of manually triggered compaction
//go rocksdb.Compaction()
@@ -323,6 +325,8 @@ func (rdb *rocksDB) Put(pairs ...*serverpb.KVPair) error {
func (rdb *rocksDB) Delete(key []byte) error {
defer rdb.opts.statsCli.Timing("rocksdb.delete.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Delete), time.Now())
+
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
wb.DeleteCF(rdb.ttlCF, key)
@@ -330,6 +334,7 @@ func (rdb *rocksDB) Delete(key []byte) error {
err := rdb.db.Write(rdb.opts.writeOpts, wb)
if err != nil {
rdb.opts.statsCli.Incr("rocksdb.delete.errors", 1)
+ rdb.stat.ResponseError.WithLabelValues(stats.Delete).Inc()
}
return err
}
@@ -346,6 +351,8 @@ func (rdb *rocksDB) Get(keys ...[]byte) ([]*serverpb.KVPair, error) {
func (rdb *rocksDB) CompareAndSet(key, expect, update []byte) (bool, error) {
defer rdb.opts.statsCli.Timing("rocksdb.cas.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.CompareAndSet), time.Now())
+
ro := rdb.opts.readOpts
wo := rdb.opts.writeOpts
to := gorocksdb.NewDefaultOptimisticTransactionOptions()
@@ -371,6 +378,7 @@ func (rdb *rocksDB) CompareAndSet(key, expect, update []byte) (bool, error) {
err = txn.Put(key, update)
if err != nil {
rdb.opts.statsCli.Incr("rocksdb.cas.set.errors", 1)
+ rdb.stat.ResponseError.WithLabelValues(stats.CompareAndSet).Inc()
return false, err
}
err = txn.Commit()
@@ -442,6 +450,7 @@ func (r *checkPointSnapshot) Close() error {
func (rdb *rocksDB) GetSnapshot() (io.ReadCloser, error) {
defer rdb.opts.statsCli.Timing("rocksdb.snapshot.get.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.GetSnapShot), time.Now())
//Prevent any other backups or restores
err := rdb.beginGlobalMutation()
@@ -505,6 +514,7 @@ func (rdb *rocksDB) PutSnapshot(snap io.ReadCloser) error {
return nil
}
defer rdb.opts.statsCli.Timing("rocksdb.snapshot.put.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.PutSnapShot), time.Now())
defer snap.Close()
//Prevent any other backups or restores
@@ -614,6 +624,8 @@ func (rdb *rocksDB) GetLatestCommittedChangeNumber() (uint64, error) {
func (rdb *rocksDB) LoadChanges(fromChangeNumber uint64, maxChanges int) ([]*serverpb.ChangeRecord, error) {
defer rdb.opts.statsCli.Timing("rocksdb.load.changes.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.LoadChange), time.Now())
+
chngIter, err := rdb.db.GetUpdatesSince(fromChangeNumber)
if err != nil {
return nil, err
@@ -636,6 +648,8 @@ func (rdb *rocksDB) GetLatestAppliedChangeNumber() (uint64, error) {
func (rdb *rocksDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error) {
defer rdb.opts.statsCli.Timing("rocksdb.save.changes.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.SaveChange), time.Now())
+
appldChngNum := uint64(0)
for _, chng := range changes {
wb := gorocksdb.WriteBatchFrom(chng.SerialisedForm)
@@ -804,9 +818,12 @@ func parseTTLMsgPackData(valueWithTTL []byte) (*ttlDataFormat, error) {
func (rdb *rocksDB) getSingleKey(ro *gorocksdb.ReadOptions, key []byte) ([]*serverpb.KVPair, error) {
defer rdb.opts.statsCli.Timing("rocksdb.single.get.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Get), time.Now())
+
values, err := rdb.db.MultiGetCFMultiCF(ro, []*gorocksdb.ColumnFamilyHandle{rdb.normalCF, rdb.ttlCF}, [][]byte{key, key})
if err != nil {
rdb.opts.statsCli.Incr("rocksdb.single.get.errors", 1)
+ rdb.stat.ResponseError.WithLabelValues(stats.Get).Inc()
return nil, err
}
value1, value2 := values[0], values[1]
@@ -849,6 +866,7 @@ func (rdb *rocksDB) extractResult(value1 *gorocksdb.Slice, value2 *gorocksdb.Sli
func (rdb *rocksDB) getMultipleKeys(ro *gorocksdb.ReadOptions, keys [][]byte) ([]*serverpb.KVPair, error) {
defer rdb.opts.statsCli.Timing("rocksdb.multi.get.latency.ms", time.Now())
+ defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.MultiGet), time.Now())
kl := len(keys)
reqCFs := make([]*gorocksdb.ColumnFamilyHandle, kl<<1)
@@ -860,6 +878,7 @@ func (rdb *rocksDB) getMultipleKeys(ro *gorocksdb.ReadOptions, keys [][]byte) ([
values, err := rdb.db.MultiGetCFMultiCF(ro, reqCFs, append(keys, keys...))
if err != nil {
rdb.opts.statsCli.Incr("rocksdb.multi.get.errors", 1)
+ rdb.stat.ResponseError.WithLabelValues(stats.MultiGet).Inc()
return nil, err
}
diff --git a/internal/storage/store.go b/internal/storage/store.go
index f9e3b072..bbe4e804 100644
--- a/internal/storage/store.go
+++ b/internal/storage/store.go
@@ -1,6 +1,8 @@
package storage
import (
+ "github.com/flipkart-incubator/dkv/internal/stats"
+ "github.com/prometheus/client_golang/prometheus"
"io"
"io/ioutil"
"os"
@@ -9,6 +11,28 @@ import (
"github.com/flipkart-incubator/dkv/pkg/serverpb"
)
+type Stat struct {
+ RequestLatency *prometheus.SummaryVec
+ ResponseError *prometheus.CounterVec
+}
+
+func NewStat() *Stat {
+ RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "storage",
+ Name: "latency",
+ Help: "Latency statistics for storage operations",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
+ MaxAge: 10 * time.Second,
+ }, []string{stats.Ops})
+ ResponseError := prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "storage",
+ Name: "error",
+ Help: "Error count for storage operations",
+ }, []string{stats.Ops})
+ prometheus.MustRegister(RequestLatency, ResponseError)
+ return &Stat{RequestLatency, ResponseError}
+}
+
// A KVStore represents the key value store that provides
// the underlying storage implementation for the various
// DKV operations.
diff --git a/internal/www/css/main.css b/internal/www/css/main.css
new file mode 100644
index 00000000..54945c61
--- /dev/null
+++ b/internal/www/css/main.css
@@ -0,0 +1,9 @@
+.chart.c3{
+ width:33%;
+ float: left;
+}
+
+.font-bold {
+ font-weight: bold;
+ font-size: 14px;
+}
diff --git a/internal/www/header-nav.html b/internal/www/header-nav.html
new file mode 100644
index 00000000..525baf51
--- /dev/null
+++ b/internal/www/header-nav.html
@@ -0,0 +1,7 @@
+
\ No newline at end of file
diff --git a/internal/www/home.html b/internal/www/home.html
new file mode 100644
index 00000000..e64a8f8f
--- /dev/null
+++ b/internal/www/home.html
@@ -0,0 +1,66 @@
+
+
+
+
+
+
+
{{througputParser(getMapValSum(stat["dkv_req_count"]))}}
+
+
+
+
+
+
+
+
+
{{op}}
+
{{ througputParser(stat["dkv_req_count"][op]) }} rps
+
+
+
+ p50 | {{latencyParser(stat["dkv_latency"][op].p50)}} |
+ p90 | {{latencyParser(stat["dkv_latency"][op].p90)}} |
+ p99 | {{latencyParser(stat["dkv_latency"][op].p99)}} |
+
+
+
+
+
+
+
+
+
{{op}}
+
{{ througputParser(stat["dkv_req_count"][op]) }} rps
+
+
+
+ p50 | {{latencyParser(stat["dkv_latency"][op].p50)}} |
+ p90 | {{latencyParser(stat["dkv_latency"][op].p90)}} |
+ p99 | {{latencyParser(stat["dkv_latency"][op].p99)}} |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
{{j}}
+
+
diff --git a/internal/www/index.html b/internal/www/index.html
new file mode 100644
index 00000000..728567f5
--- /dev/null
+++ b/internal/www/index.html
@@ -0,0 +1,67 @@
+
+
+
+ DKV Dashboard
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/internal/www/js/config.js b/internal/www/js/config.js
new file mode 100644
index 00000000..2a41e88d
--- /dev/null
+++ b/internal/www/js/config.js
@@ -0,0 +1,9 @@
+CONFIG={
+ "router": {
+ "Home":{
+ "path":"/",
+ "controller":"HomeCtrl",
+ "templateUrl": "home.html",
+ },
+ },
+};
diff --git a/internal/www/js/controllers.js b/internal/www/js/controllers.js
new file mode 100644
index 00000000..c0c06952
--- /dev/null
+++ b/internal/www/js/controllers.js
@@ -0,0 +1,170 @@
+(function() {
+
+
+ angular.module("DKV.Dashboard.Controllers", ['DKV.Dashboard.Services','chart.js', 'dataGrid', 'pagination', 'ngCookies' ,'ngclipboard','mgcrea.ngStrap','ngSanitize'])
+
+ .config(['ChartJsProvider', function (ChartJsProvider) {
+ ChartJsProvider.setOptions({
+ responsive: false
+ });
+ ChartJsProvider.setOptions('line', {
+ showLines: true,
+ animation: {
+ duration:0
+ },
+ legend: {
+ display: true,
+ position: 'bottom',
+ labels:{
+ boxWidth:20
+ }
+ },
+ scales: {
+ yAxes: [{
+ ticks: {
+ beginAtZero:true
+ },
+ gridLines: {
+ display:false
+ }
+ }],
+ xAxes: [{
+ type: 'time',
+ ticks: {
+ maxRotation: 0,
+ minRotation: 0,
+ autoSkip: true,
+ maxTickLimit: 3
+ },
+ gridLines: {
+ display:false
+ }
+ }]
+ },
+ tooltips: {
+ intersect:false,
+ callbacks : {
+ title: function(tooltipItems, data) {
+ return tooltipItems[0].xLabel.toLocaleTimeString();
+ }
+ }
+ },
+ elements: {
+ line: {
+ fill: false,
+ borderWidth: 1
+ },
+ point: {
+ radius: 1
+ }
+
+ }
+ });
+ }])
+
+
+ .controller("HomeCtrl", [ '$location', '$scope', '$rootScope','$timeout','$sce','DKVService',
+ function($location, $scope, $rootScope ,$timeout, $sce, DKVService) {
+ /* discover master nodes from here*/
+ $scope.masters = {
+ "m1":"http://127.0.0.1:7081",
+ "m2":"http://127.0.0.1:7082",
+ "m3":"http://127.0.0.1:7083",
+ }
+
+ $scope.througputParser = througputParser
+ $scope.latencyParser = latencyParser
+ $scope.getMapValSum = getMapValSum
+
+ $scope.ops1 = ["getLin","getSeq","mgetLin","mgetSeq",]
+ $scope.ops2 = ["put","mput","cas","del"]
+
+ $scope.source = new EventSource(DKVService.GetClusterData($scope.masters.m1));
+ $scope.source.addEventListener('message', function(e) {
+ data = JSON.parse(e.data)
+ $scope.stat = data["global"]
+ delete data["global"]
+ $scope.stats = data
+ /* initialize after the first population */
+ if ( $scope.series === undefined ) {
+ initMetric()
+ }
+
+ console.log(new Date($scope.stat.ts))
+ console.log(new Date())
+ $scope.tsEvent.push(new Date($scope.stat.ts));
+
+ for ( i = 0 ; i < $scope.series.length; i++ ) {
+ $scope.ts.rrate.data[i].push(getMapValSum($scope.stats[$scope.series[i]]["dkv_req_count"]))
+ $scope.ts.error.data[i].push(getMapValSum($scope.stats[$scope.series[i]]["dkv_req_count"]) / 100)
+ avgLatency = getAvgLatency($scope.stats[$scope.series[i]]["dkv_latency"])
+ $scope.ts.p50.data[i].push(avgLatency.p50)
+ $scope.ts.p99.data[i].push(avgLatency.p99)
+ }
+
+ if ( $scope.tsEvent.length > 60 ) {
+ $scope.tsEvent.splice(0,1);
+ for ( i = 0 ; i < $scope.series.length; i++ ) {
+ $scope.ts.rrate.data[i].splice(0,1);
+ $scope.ts.error.data[i].splice(0,1);
+ $scope.ts.p50.data[i].splice(0,1);
+ $scope.ts.p99.data[i].splice(0,1);
+ }
+ }
+
+ });
+
+ function createStat(parser,titleText) {
+ var stat = { series : $scope.series, colors: $scope.colors, label: $scope.tsEvent, data : getEmptyArray($scope.series.length),
+ options: {
+ responsive: true,
+ maintainAspectRatio: false,
+ scales: {
+ yAxes: [{
+ ticks: {
+ callback: function (value, index, values) {
+ return parser(value)
+ }
+ },
+ }]
+ },
+ title: {
+ display: true, text: titleText,
+ fontSize: 13, fontColor: "#000000"
+ },
+ legend: {
+ display: false
+ },
+ elements: {
+ line: {
+ fill: false,
+ borderWidth: 1.7
+ },
+ point: {
+ radius: 0
+ }
+
+ }
+ }
+ };
+ return stat
+ }
+ function initMetric() {
+ $scope.ts = {}
+ $scope.tsEvent = []
+ $scope.series = Object.keys($scope.stats)
+ $scope.colors = [colorGreen,colorOrange,colorBlue,colorRed,colorViolet,colorDeepBlue]
+
+ $scope.ts.rrate = createStat(througputParser,"request rate")
+ $scope.ts.error = createStat(percentageParser,"error rate")
+ $scope.ts.p50 = createStat(latencyParser,"50th percentile")
+ $scope.ts.p99 = createStat(latencyParser,"99th percentile")
+ }
+
+ window.setInterval(function(){
+ $scope.$apply(function () {
+ });
+ }, 1000);
+ }
+ ])
+}());
diff --git a/internal/www/js/directives.js b/internal/www/js/directives.js
new file mode 100644
index 00000000..104cc171
--- /dev/null
+++ b/internal/www/js/directives.js
@@ -0,0 +1,13 @@
+(function() {
+
+ angular.module("DKV.Dashboard.Directives", ["DKV.Dashboard.Controllers"])
+ .directive("headerNav", function() {
+ return {
+ restrict: "E",
+ templateUrl: "header-nav.html",
+ controller: function($scope, $rootScope, $location, $sce) {
+ $scope.config = CONFIG;
+ }
+ };
+ })
+}());
diff --git a/internal/www/js/lib.js b/internal/www/js/lib.js
new file mode 100644
index 00000000..85feb6a2
--- /dev/null
+++ b/internal/www/js/lib.js
@@ -0,0 +1,75 @@
+var througputParser = function (d) {
+ if ( d === undefined ) {
+ return 0
+ }
+ var sizes = ['', 'K', 'M', 'B', 'T'];
+ if (d < 1) return d.toFixed(1);
+ var i = Math.floor(Math.log(d) / Math.log(1000));
+ var base = d / Math.pow(1000, i);
+ if ( Math.round(base) === base ){
+ return base + ' ' + sizes[i]
+ }
+ return base.toFixed(1) + ' ' + sizes[i];
+};
+
+var latencyParser = function (d) {
+ if ( d === undefined ) {
+ return 0
+ }
+ d = d * 1000000
+ var sizes = ['µs', 'ms', 's'];
+ var i = Math.floor(Math.log(d) / Math.log(1000));
+ var base = d / Math.pow(1000, i);
+ if ( Math.round(base) === base ){
+ return base + ' ' + sizes[i]
+ }
+ return base.toFixed(1) + ' ' + sizes[i];
+};
+
+var getMapValSum = function (d) {
+ let sum = 0;
+ for (let key in d) {
+ sum += d[key];
+ }
+ return sum
+}
+
+var getAvgLatency = function (d) {
+ latency = { p50: 0 , p90 : 0 , p99 : 0}
+
+ console.log(d)
+ for (let key in d ) {
+ latency.p50 += d[key].p50
+ latency.p90 += d[key].p90
+ latency.p99 += d[key].p99
+ }
+ console.log(latency)
+
+ count = Object.keys(d).length
+ latency.p50 /= count
+ latency.p90 /= count
+ latency.p99 /= count
+
+ console.log(latency)
+ return latency
+}
+
+var percentageParser = function (d) {
+ if (Math.round(d) === d) return Math.round(d)+" %";
+ return d.toFixed(2)+" %";
+}
+
+function getEmptyArray(size){
+ var data = [];
+ for (i = 0; i < size; i++) {
+ data.push([]);
+ }
+ return data;
+}
+var colorGreen = {backgroundColor: '#62a043', borderColor: '#62a043', hoverBackgroundColor: '#62a043', hoverBorderColor: '#62a043'};
+var colorOrange = {backgroundColor: '#dc923f', borderColor: '#dc923f', hoverBackgroundColor: '#dc923f', hoverBorderColor: '#dc923f'};
+var colorBlue = {backgroundColor: '#0a9bdc', borderColor: '#0a9bdc', hoverBackgroundColor: '#0a9bdc', hoverBorderColor: '#0a9bdc'};
+var colorRed = {backgroundColor: '#bc4040', borderColor: '#bc4040', hoverBackgroundColor: '#bc4040', hoverBorderColor: '#bc4040'};
+var colorYellow = {backgroundColor: '#ffe50c', borderColor: '#ffe50c', hoverBackgroundColor: '#ffe50c', hoverBorderColor: '#ffe50c'};
+var colorViolet = {backgroundColor: '#9263ff', borderColor: '#9263ff', hoverBackgroundColor: '#9263ff', hoverBorderColor: '#9263ff'};
+var colorDeepBlue = {backgroundColor: '#0001ff', borderColor: '#0001ff', hoverBackgroundColor: '#0001ff', hoverBorderColor: '#0001ff'};
diff --git a/internal/www/js/main.js b/internal/www/js/main.js
new file mode 100644
index 00000000..75ab49d5
--- /dev/null
+++ b/internal/www/js/main.js
@@ -0,0 +1,25 @@
+(function() {
+
+ angular.module("DKV.Dashboard",['ngRoute', 'DKV.Dashboard.Services', 'DKV.Dashboard.Controllers', 'DKV.Dashboard.Directives'])
+
+ /** routes */
+ .config( function($routeProvider,$locationProvider) {
+ for (id in CONFIG.router) {
+ $routeProvider.when(CONFIG.router[id].path, {
+ templateUrl: CONFIG.router[id].templateUrl,
+ controller: CONFIG.router[id].controller
+ });
+ }
+ $routeProvider.otherwise({
+ redirectTo: "/"
+ });
+ $locationProvider.html5Mode(true);
+ })
+
+ .run(['$rootScope', '$location', '$anchorScroll', function($rootScope, $location, $anchorScroll) {
+ $rootScope.$on('$routeChangeSuccess', function(newRoute, oldRoute) {
+ if ($location.hash()) $anchorScroll();
+ });
+ }]);
+
+}());
diff --git a/internal/www/js/services.js b/internal/www/js/services.js
new file mode 100644
index 00000000..31b81464
--- /dev/null
+++ b/internal/www/js/services.js
@@ -0,0 +1,12 @@
+(function() {
+ angular.module("DKV.Dashboard.Services", [])
+
+ .factory("DKVService", [ '$http','$sce', function($http,$sce) {
+ return {
+ GetClusterData: function(endpoint) {
+ return $sce.trustAsResourceUrl(endpoint+"/metrics/cluster")
+ },
+ }
+ }])
+}());
+
diff --git a/internal/www/run.sh b/internal/www/run.sh
new file mode 100644
index 00000000..f4a4c5fb
--- /dev/null
+++ b/internal/www/run.sh
@@ -0,0 +1,3 @@
+#!/usr/bin/env bash
+
+python -m SimpleHTTPServer 8989
\ No newline at end of file
diff --git a/pkg/serverpb/admin.pb.go b/pkg/serverpb/admin.pb.go
index b07244b1..453da423 100644
--- a/pkg/serverpb/admin.pb.go
+++ b/pkg/serverpb/admin.pb.go
@@ -1039,6 +1039,9 @@ type RegionInfo struct {
// Nexus cluster url of the region
// Will be used by new followers to discover the raft cluster
NexusClusterUrl *string `protobuf:"bytes,7,opt,name=nexusClusterUrl,proto3,oneof" json:"nexusClusterUrl,omitempty"`
+ // http listener of the node.
+ // http endpoint is useful for admin API interactions
+ HttpAddress string `protobuf:"bytes,8,opt,name=httpAddress,proto3" json:"httpAddress,omitempty"`
}
func (x *RegionInfo) Reset() {
@@ -1122,6 +1125,13 @@ func (x *RegionInfo) GetNexusClusterUrl() string {
return ""
}
+func (x *RegionInfo) GetHttpAddress() string {
+ if x != nil {
+ return x.HttpAddress
+ }
+ return ""
+}
+
var File_pkg_serverpb_admin_proto protoreflect.FileDescriptor
var file_pkg_serverpb_admin_proto_rawDesc = []byte{
@@ -1235,7 +1245,7 @@ var file_pkg_serverpb_admin_proto_rawDesc = []byte{
0x12, 0x3a, 0x0a, 0x0b, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x18,
0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76,
0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52,
- 0x0b, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x22, 0xa3, 0x02, 0x0a,
+ 0x0b, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x22, 0xc5, 0x02, 0x0a,
0x0a, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x64,
0x63, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x63, 0x49, 0x44, 0x12,
0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02,
@@ -1251,78 +1261,80 @@ var file_pkg_serverpb_admin_proto_rawDesc = []byte{
0x00, 0x52, 0x0a, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, 0x88, 0x01, 0x01,
0x12, 0x2d, 0x0a, 0x0f, 0x6e, 0x65, 0x78, 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72,
0x55, 0x72, 0x6c, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0f, 0x6e, 0x65, 0x78,
- 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x88, 0x01, 0x01, 0x42,
- 0x0d, 0x0a, 0x0b, 0x5f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, 0x42, 0x12,
- 0x0a, 0x10, 0x5f, 0x6e, 0x65, 0x78, 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x55,
- 0x72, 0x6c, 0x2a, 0x68, 0x0a, 0x0c, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74,
- 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x00,
- 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x45, 0x41, 0x44, 0x45, 0x52, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10,
- 0x50, 0x52, 0x49, 0x4d, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57, 0x45, 0x52,
- 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x45, 0x43, 0x4f, 0x4e, 0x44, 0x41, 0x52, 0x59, 0x5f,
- 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57, 0x45, 0x52, 0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c, 0x41, 0x43,
- 0x54, 0x49, 0x56, 0x45, 0x5f, 0x53, 0x4c, 0x41, 0x56, 0x45, 0x10, 0x04, 0x32, 0xae, 0x02, 0x0a,
- 0x0e, 0x44, 0x4b, 0x56, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12,
- 0x4f, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x12, 0x1f, 0x2e,
- 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74,
- 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20,
- 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65,
- 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x12, 0x39, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x15,
+ 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x88, 0x01, 0x01, 0x12,
+ 0x20, 0x0a, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
+ 0x73, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74,
+ 0x42, 0x12, 0x0a, 0x10, 0x5f, 0x6e, 0x65, 0x78, 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65,
+ 0x72, 0x55, 0x72, 0x6c, 0x2a, 0x68, 0x0a, 0x0c, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x53, 0x74,
+ 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45,
+ 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x45, 0x41, 0x44, 0x45, 0x52, 0x10, 0x01, 0x12, 0x14,
+ 0x0a, 0x10, 0x50, 0x52, 0x49, 0x4d, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57,
+ 0x45, 0x52, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x45, 0x43, 0x4f, 0x4e, 0x44, 0x41, 0x52,
+ 0x59, 0x5f, 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57, 0x45, 0x52, 0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c,
+ 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x5f, 0x53, 0x4c, 0x41, 0x56, 0x45, 0x10, 0x04, 0x32, 0xae,
+ 0x02, 0x0a, 0x0e, 0x44, 0x4b, 0x56, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
+ 0x6e, 0x12, 0x4f, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x12,
+ 0x1f, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47,
+ 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x1a, 0x20, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e,
+ 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+ 0x73, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61,
+ 0x12, 0x15, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e,
+ 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65,
+ 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3c, 0x0a,
+ 0x0d, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x15,
0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65,
0x70, 0x6c, 0x69, 0x63, 0x61, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76,
- 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3c, 0x0a, 0x0d, 0x52,
- 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x15, 0x2e, 0x64,
- 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x70, 0x6c,
- 0x69, 0x63, 0x61, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
- 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x52, 0x0a, 0x0b, 0x47, 0x65, 0x74,
- 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x20, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73,
- 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x69,
- 0x63, 0x61, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x64, 0x6b, 0x76,
+ 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x52, 0x0a, 0x0b, 0x47,
+ 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x20, 0x2e, 0x64, 0x6b, 0x76,
0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70,
- 0x6c, 0x69, 0x63, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8e, 0x01,
- 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x74, 0x6f,
- 0x72, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x12, 0x1b, 0x2e, 0x64,
- 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x63, 0x6b,
- 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e,
- 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12,
- 0x3d, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x1c, 0x2e, 0x64, 0x6b, 0x76,
- 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72,
- 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73,
- 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0xd6,
- 0x01, 0x0a, 0x0a, 0x44, 0x4b, 0x56, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x3d, 0x0a,
- 0x07, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1c, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73,
- 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x52,
- 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72,
- 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x43, 0x0a, 0x0a,
- 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x2e, 0x64, 0x6b, 0x76,
- 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65,
- 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b,
- 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75,
- 0x73, 0x12, 0x44, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x12, 0x16,
- 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
- 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1f, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72,
- 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x52,
- 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xb4, 0x01, 0x0a, 0x0c, 0x44, 0x4b, 0x56, 0x44,
- 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, 0x47, 0x0a, 0x0c, 0x55, 0x70, 0x64, 0x61,
- 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73,
- 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74,
- 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b,
+ 0x6c, 0x69, 0x63, 0x61, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x64,
+ 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52,
+ 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32,
+ 0x8e, 0x01, 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73,
+ 0x74, 0x6f, 0x72, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x12, 0x1b,
+ 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x42, 0x61,
+ 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b,
0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75,
- 0x73, 0x12, 0x5b, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49,
- 0x6e, 0x66, 0x6f, 0x12, 0x23, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
- 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66,
- 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73,
- 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74,
- 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x51,
- 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x4e, 0x6f,
- 0x64, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12,
- 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
- 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65,
- 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66,
- 0x6f, 0x42, 0x30, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
- 0x66, 0x6c, 0x69, 0x70, 0x6b, 0x61, 0x72, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x75, 0x62, 0x61, 0x74,
- 0x6f, 0x72, 0x2f, 0x64, 0x6b, 0x76, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65,
- 0x72, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x73, 0x12, 0x3d, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x1c, 0x2e, 0x64,
+ 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, 0x74,
+ 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76,
+ 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73,
+ 0x32, 0xd6, 0x01, 0x0a, 0x0a, 0x44, 0x4b, 0x56, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12,
+ 0x3d, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1c, 0x2e, 0x64, 0x6b, 0x76,
+ 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64,
+ 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73,
+ 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x43,
+ 0x0a, 0x0a, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x2e, 0x64,
+ 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6d, 0x6f,
+ 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e,
+ 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61,
+ 0x74, 0x75, 0x73, 0x12, 0x44, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x73,
+ 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
+ 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1f, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73,
+ 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65,
+ 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xb4, 0x01, 0x0a, 0x0c, 0x44, 0x4b,
+ 0x56, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, 0x47, 0x0a, 0x0c, 0x55, 0x70,
+ 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x64, 0x6b, 0x76,
+ 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65,
+ 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e,
+ 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61,
+ 0x74, 0x75, 0x73, 0x12, 0x5b, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65,
+ 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x23, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76,
+ 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49,
+ 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x64, 0x6b, 0x76,
+ 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75,
+ 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
+ 0x32, 0x51, 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79,
+ 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75,
+ 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x64, 0x6b, 0x76, 0x2e,
+ 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49,
+ 0x6e, 0x66, 0x6f, 0x42, 0x30, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
+ 0x6d, 0x2f, 0x66, 0x6c, 0x69, 0x70, 0x6b, 0x61, 0x72, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x75, 0x62,
+ 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x64, 0x6b, 0x76, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72,
+ 0x76, 0x65, 0x72, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/pkg/serverpb/admin.proto b/pkg/serverpb/admin.proto
index 6d290c8e..e07eb8e8 100644
--- a/pkg/serverpb/admin.proto
+++ b/pkg/serverpb/admin.proto
@@ -189,6 +189,9 @@ message RegionInfo {
// Nexus cluster url of the region
// Will be used by new followers to discover the raft cluster
optional string nexusClusterUrl = 7;
+ // http listener of the node.
+ // http endpoint is useful for admin API interactions
+ string httpAddress = 8;
}
enum RegionStatus {