diff --git a/cmd/dkvsrv/main.go b/cmd/dkvsrv/main.go index 5e264feb..5044b740 100644 --- a/cmd/dkvsrv/main.go +++ b/cmd/dkvsrv/main.go @@ -1,10 +1,14 @@ package main import ( + "encoding/json" "flag" "fmt" "github.com/flipkart-incubator/dkv/internal/discovery" + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" "gopkg.in/ini.v1" + "io/ioutil" "log" "net" "net/http" @@ -49,9 +53,10 @@ var ( vBucket string // Node level configuration common for all regions in the node - dbFolder string - dbListenAddr string - statsdAddr string + dbFolder string + dbListenAddr string + statsdAddr string + httpServerAddr string // Service discovery related params discoveryConf string @@ -71,13 +76,18 @@ var ( nexusLogDirFlag, nexusSnapDirFlag *flag.Flag - statsCli stats.Client + statsCli stats.Client + statsPublisher *stats.StatPublisher + + discoveryClient discovery.Client + statAggregatorRegistry *stats.StatAggregatorRegistry ) func init() { flag.BoolVar(&disklessMode, "diskless", false, fmt.Sprintf("Enables diskless mode where data is stored entirely in memory.\nAvailable on Badger for standalone and slave roles. (default %v)", disklessMode)) flag.StringVar(&dbFolder, "db-folder", "/tmp/dkvsrv", "DB folder path for storing data files") flag.StringVar(&dbListenAddr, "listen-addr", "0.0.0.0:8080", "Address on which the DKV service binds") + flag.StringVar(&httpServerAddr, "http-server-addr", "0.0.0.0:8181", "Address on which the http service binds") flag.StringVar(&dbEngine, "db-engine", "rocksdb", "Underlying DB engine for storing data - badger|rocksdb") flag.StringVar(&dbEngineIni, "db-engine-ini", "", "An .ini file for configuring the underlying storage engine. Refer badger.ini or rocks.ini for more details.") flag.StringVar(&dbRole, "role", "none", "DB role of this node - none|master|slave|discovery") @@ -119,7 +129,7 @@ func main() { setupAccessLogger() setFlagsForNexusDirs() setupStats() - printFlagsWithoutPrefix() + go setupHttpServer() if pprofEnable { go func() { @@ -142,6 +152,7 @@ func main() { regionInfo := &serverpb.RegionInfo{ DcID: dcID, NodeAddress: nodeAddr.Host, + HttpAddress: httpServerAddr, Database: database, VBucket: vBucket, Status: serverpb.RegionStatus_INACTIVE, @@ -149,8 +160,7 @@ func main() { NexusClusterUrl: nil, } - var discoveryClient discovery.Client - if srvrRole != noRole && srvrRole != discoveryRole { + if srvrRole != noRole { var err error discoveryClient, err = newDiscoveryClient() if err != nil { @@ -417,6 +427,9 @@ func setupStats() { } else { statsCli = stats.NewNoOpClient() } + statsPublisher = stats.NewStatPublisher() + statAggregatorRegistry = stats.NewStatAggregatorRegistry() + go statsPublisher.Run() } func newKVStore() (storage.KVStore, storage.ChangePropagator, storage.ChangeApplier, storage.Backupable) { @@ -567,3 +580,94 @@ func nodeAddress() (*url.URL, error) { ep := url.URL{Host: fmt.Sprintf("%s:%s", ip, port)} return &ep, nil } + +func setupHttpServer() { + router := mux.NewRouter() + router.Handle("/metrics/prometheus", promhttp.Handler()) + router.HandleFunc("/metrics/json", jsonMetricHandler) + router.HandleFunc("/metrics/stream", statsStreamHandler) + // Should be enabled only for discovery server ? + router.HandleFunc("/metrics/cluster", clusterMetricsHandler) + http.Handle("/", router) + http.ListenAndServe(httpServerAddr, nil) +} + +func jsonMetricHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + metrics, _ := stats.GetMetrics() + json.NewEncoder(w).Encode(metrics) +} + +func statsStreamHandler(w http.ResponseWriter, r *http.Request) { + if f, ok := w.(http.Flusher); !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } else { + // Set the headers related to event streaming. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin")) + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + + statChannel := make(chan stats.DKVMetrics, 5) + channelId := statsPublisher.Register(statChannel) + defer func() { + ioutil.ReadAll(r.Body) + r.Body.Close() + }() + // Listen to the closing of the http connection via the CloseNotifier + notify := w.(http.CloseNotifier).CloseNotify() + for { + select { + case stat := <-statChannel: + statJson, _ := json.Marshal(stat) + fmt.Fprintf(w, "data: %s\n\n", statJson) + f.Flush() + case <-notify: + statsPublisher.DeRegister(channelId) + return + } + } + } +} + +func clusterMetricsHandler(w http.ResponseWriter, r *http.Request) { + regions, err := discoveryClient.GetClusterStatus("", "") + if err != nil { + http.Error(w, "Unable to discover peers!", http.StatusInternalServerError) + return + } + if f, ok := w.(http.Flusher); !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } else { + // Set the headers related to event streaming. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin")) + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + + statChannel := make(chan map[string]*stats.DKVMetrics, 5) + channelId := statAggregatorRegistry.Register(regions, func(region *serverpb.RegionInfo) string { return region.Database }, statChannel) + defer func() { + ioutil.ReadAll(r.Body) + r.Body.Close() + }() + // Listen to the closing of the http connection via the CloseNotifier + notify := w.(http.CloseNotifier).CloseNotify() + for { + select { + case stat := <-statChannel: + statJson, _ := json.Marshal(stat) + fmt.Fprintf(w, "data: %s\n\n", statJson) + f.Flush() + case <-notify: + fmt.Println("http request closed") + statAggregatorRegistry.DeRegister(channelId) + return + } + } + } +} diff --git a/go.mod b/go.mod index 3376b207..9668a5b4 100644 --- a/go.mod +++ b/go.mod @@ -15,12 +15,17 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.2 // indirect + github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 github.com/kpango/fastime v1.0.16 github.com/matttproud/golang_protobuf_extensions v1.0.1 + github.com/onsi/ginkgo v1.12.0 + github.com/onsi/gomega v1.9.0 github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.5.1 // indirect + github.com/prometheus/client_golang v1.5.1 + github.com/prometheus/client_model v0.2.0 github.com/prometheus/procfs v0.0.10 // indirect + github.com/sirupsen/logrus v1.4.2 github.com/smartystreets/goconvey v1.6.4 // indirect github.com/smira/go-statsd v1.3.1 github.com/vmihailenco/msgpack/v5 v5.3.4 diff --git a/internal/discovery/client.go b/internal/discovery/client.go index 77e0d6f4..af28f842 100644 --- a/internal/discovery/client.go +++ b/internal/discovery/client.go @@ -85,7 +85,6 @@ func getDiscoveryClient(discoveryServiceAddr string) (*grpc.ClientConn, error) { defer cancel() return grpc.DialContext(ctx, discoveryServiceAddr, grpc.WithInsecure(), - grpc.WithBlock(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), grpc.WithReadBufferSize(readBufSize), grpc.WithWriteBufferSize(writeBufSize), @@ -155,6 +154,8 @@ func (m *discoveryClient) pollClusterInfo() error { } } +// gets cluster info for the provided database and vBucket +// database and vBucket can be empty strings, in which case, the entire cluster set is returned func (m *discoveryClient) GetClusterStatus(database string, vBucket string) ([]*serverpb.RegionInfo, error) { if m.clusterInfo == nil { // When called before cluster info is initialised @@ -165,8 +166,10 @@ func (m *discoveryClient) GetClusterStatus(database string, vBucket string) ([]* } var regions []*serverpb.RegionInfo for _, region := range m.clusterInfo { - if region.Database == database && region.VBucket == vBucket { - regions = append(regions, region) + if region.Database == database || database == "" { + if region.VBucket == vBucket || vBucket == "" { + regions = append(regions, region) + } } } return regions, nil diff --git a/internal/discovery/service.go b/internal/discovery/service.go index 9ccfb986..4e422f38 100644 --- a/internal/discovery/service.go +++ b/internal/discovery/service.go @@ -120,6 +120,7 @@ func (d *discoverService) GetClusterInfo(ctx context.Context, request *serverpb. continue } // Filter inactive regions and regions whose status was updated long time back and hence considered inactive + // This simplifies logic on consumers of this API (envoy, slaves) which don't need to filter by status if hlc.GetTimeAgo(statusUpdate.GetTimestamp()) < d.config.HeartbeatTimeout && statusUpdate.GetRegionInfo().GetStatus() != serverpb.RegionStatus_INACTIVE { regionsInfo = append(regionsInfo, statusUpdate.GetRegionInfo()) } diff --git a/internal/master/service.go b/internal/master/service.go index 6d4aa533..8506b274 100644 --- a/internal/master/service.go +++ b/internal/master/service.go @@ -6,10 +6,12 @@ import ( "encoding/gob" "errors" "fmt" + "github.com/prometheus/client_golang/prometheus" "io" "strconv" "strings" "sync" + "time" "google.golang.org/protobuf/types/known/emptypb" @@ -33,6 +35,28 @@ type DKVService interface { serverpb.DKVBackupRestoreServer } +type dkvServiceStat struct { + Latency *prometheus.SummaryVec + ResponseError *prometheus.CounterVec +} + +func newDKVServiceStat() *dkvServiceStat { + RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "dkv", + Name: "latency", + Help: "Latency statistics for dkv service", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + MaxAge: 10 * time.Second, + }, []string{"Ops"}) + ResponseError := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "dkv", + Name: "error", + Help: "Error count for storage operations", + }, []string{"Ops"}) + prometheus.MustRegister(RequestLatency, ResponseError) + return &dkvServiceStat{RequestLatency, ResponseError} +} + type standaloneService struct { store storage.KVStore cp storage.ChangePropagator @@ -41,6 +65,7 @@ type standaloneService struct { rwl *sync.RWMutex lg *zap.Logger statsCli stats.Client + stat *dkvServiceStat regionInfo *serverpb.RegionInfo } @@ -53,10 +78,11 @@ func (ss *standaloneService) GetStatus(ctx context.Context, request *emptypb.Emp func NewStandaloneService(store storage.KVStore, cp storage.ChangePropagator, br storage.Backupable, lgr *zap.Logger, statsCli stats.Client, regionInfo *serverpb.RegionInfo) DKVService { rwl := &sync.RWMutex{} regionInfo.Status = serverpb.RegionStatus_LEADER - return &standaloneService{store, cp, br, rwl, lgr, statsCli, regionInfo} + return &standaloneService{store, cp, br, rwl, lgr, statsCli, newDKVServiceStat(), regionInfo} } func (ss *standaloneService) Put(ctx context.Context, putReq *serverpb.PutRequest) (*serverpb.PutResponse, error) { + defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Put), time.Now()) ss.rwl.RLock() defer ss.rwl.RUnlock() if err := ss.store.Put(&serverpb.KVPair{Key: putReq.Key, Value: putReq.Value, ExpireTS: putReq.ExpireTS}); err != nil { @@ -82,6 +108,7 @@ func (ss *standaloneService) MultiPut(ctx context.Context, putReq *serverpb.Mult } func (ss *standaloneService) Delete(ctx context.Context, delReq *serverpb.DeleteRequest) (*serverpb.DeleteResponse, error) { + defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Delete), time.Now()) ss.rwl.RLock() defer ss.rwl.RUnlock() @@ -93,9 +120,9 @@ func (ss *standaloneService) Delete(ctx context.Context, delReq *serverpb.Delete } func (ss *standaloneService) Get(ctx context.Context, getReq *serverpb.GetRequest) (*serverpb.GetResponse, error) { + defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Get+getReadConsistencySuffix(getReq.ReadConsistency)), time.Now()) ss.rwl.RLock() defer ss.rwl.RUnlock() - readResults, err := ss.store.Get(getReq.Key) res := &serverpb.GetResponse{Status: newEmptyStatus()} if err != nil { @@ -112,6 +139,7 @@ func (ss *standaloneService) Get(ctx context.Context, getReq *serverpb.GetReques } func (ss *standaloneService) MultiGet(ctx context.Context, multiGetReq *serverpb.MultiGetRequest) (*serverpb.MultiGetResponse, error) { + defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.MultiGet+getReadConsistencySuffix(multiGetReq.ReadConsistency)), time.Now()) ss.rwl.RLock() defer ss.rwl.RUnlock() @@ -127,6 +155,7 @@ func (ss *standaloneService) MultiGet(ctx context.Context, multiGetReq *serverpb } func (ss *standaloneService) CompareAndSet(ctx context.Context, casReq *serverpb.CompareAndSetRequest) (*serverpb.CompareAndSetResponse, error) { + defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.CompareAndSet), time.Now()) ss.rwl.RLock() defer ss.rwl.RUnlock() @@ -283,6 +312,7 @@ func (ss *standaloneService) Restore(ctx context.Context, restoreReq *serverpb.R } func (ss *standaloneService) Iterate(iterReq *serverpb.IterateRequest, dkvIterSrvr serverpb.DKV_IterateServer) error { + defer stats.MeasureLatency(ss.stat.Latency.WithLabelValues(stats.Iterate), time.Now()) ss.rwl.RLock() defer ss.rwl.RUnlock() @@ -320,6 +350,7 @@ type distributedService struct { raftRepl nexus_api.RaftReplicator lg *zap.Logger statsCli stats.Client + stat *dkvServiceStat isClosed bool } @@ -327,10 +358,11 @@ type distributedService struct { // that attempts to replicate data across multiple replicas over Nexus. func NewDistributedService(kvs storage.KVStore, cp storage.ChangePropagator, br storage.Backupable, raftRepl nexus_api.RaftReplicator, lgr *zap.Logger, statsCli stats.Client, regionInfo *serverpb.RegionInfo) DKVClusterService { - return &distributedService{NewStandaloneService(kvs, cp, br, lgr, statsCli, regionInfo), raftRepl, lgr, statsCli, false} + return &distributedService{NewStandaloneService(kvs, cp, br, lgr, statsCli, regionInfo), raftRepl, lgr, statsCli, newDKVServiceStat(), false} } func (ds *distributedService) Put(ctx context.Context, putReq *serverpb.PutRequest) (*serverpb.PutResponse, error) { + defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Put), time.Now()) reqBts, err := proto.Marshal(&raftpb.InternalRaftRequest{Put: putReq}) res := &serverpb.PutResponse{Status: newEmptyStatus()} if err != nil { @@ -361,6 +393,7 @@ func (ds *distributedService) MultiPut(ctx context.Context, multiPutReq *serverp } func (ds *distributedService) CompareAndSet(ctx context.Context, casReq *serverpb.CompareAndSetRequest) (*serverpb.CompareAndSetResponse, error) { + defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.CompareAndSet), time.Now()) reqBts, _ := proto.Marshal(&raftpb.InternalRaftRequest{Cas: casReq}) res := &serverpb.CompareAndSetResponse{Status: newEmptyStatus()} casRes, err := ds.raftRepl.Save(ctx, reqBts) @@ -375,6 +408,7 @@ func (ds *distributedService) CompareAndSet(ctx context.Context, casReq *serverp } func (ds *distributedService) Delete(ctx context.Context, delReq *serverpb.DeleteRequest) (*serverpb.DeleteResponse, error) { + defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Delete), time.Now()) reqBts, err := proto.Marshal(&raftpb.InternalRaftRequest{Delete: delReq}) res := &serverpb.DeleteResponse{Status: newEmptyStatus()} if err != nil { @@ -390,6 +424,7 @@ func (ds *distributedService) Delete(ctx context.Context, delReq *serverpb.Delet } func (ds *distributedService) Get(ctx context.Context, getReq *serverpb.GetRequest) (*serverpb.GetResponse, error) { + defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Get+getReadConsistencySuffix(getReq.GetReadConsistency())), time.Now()) switch getReq.ReadConsistency { case serverpb.ReadConsistency_SEQUENTIAL: return ds.DKVService.Get(ctx, getReq) @@ -417,6 +452,7 @@ func (ds *distributedService) Get(ctx context.Context, getReq *serverpb.GetReque } func (ds *distributedService) MultiGet(ctx context.Context, multiGetReq *serverpb.MultiGetRequest) (*serverpb.MultiGetResponse, error) { + defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.MultiGet+getReadConsistencySuffix(multiGetReq.GetReadConsistency())), time.Now()) switch multiGetReq.ReadConsistency { case serverpb.ReadConsistency_SEQUENTIAL: return ds.DKVService.MultiGet(ctx, multiGetReq) @@ -447,6 +483,7 @@ func gobDecodeAsKVPairs(val []byte) ([]*serverpb.KVPair, error) { } func (ds *distributedService) Iterate(iterReq *serverpb.IterateRequest, dkvIterSrvr serverpb.DKV_IterateServer) error { + defer stats.MeasureLatency(ds.stat.Latency.WithLabelValues(stats.Iterate), time.Now()) return ds.DKVService.Iterate(iterReq, dkvIterSrvr) } @@ -522,3 +559,13 @@ func newErrorStatus(err error) *serverpb.Status { func newEmptyStatus() *serverpb.Status { return &serverpb.Status{Code: 0, Message: ""} } + +func getReadConsistencySuffix(rc serverpb.ReadConsistency) string { + switch rc { + case serverpb.ReadConsistency_SEQUENTIAL: + return "Seq" + case serverpb.ReadConsistency_LINEARIZABLE: + return "Lin" + } + return "" +} diff --git a/internal/slave/service.go b/internal/slave/service.go index 941d94d8..9d965936 100644 --- a/internal/slave/service.go +++ b/internal/slave/service.go @@ -10,6 +10,7 @@ import ( "github.com/flipkart-incubator/dkv/internal/storage" "github.com/flipkart-incubator/dkv/pkg/ctl" "github.com/flipkart-incubator/dkv/pkg/serverpb" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/protobuf/types/known/emptypb" "io" @@ -61,11 +62,27 @@ type slaveService struct { ca storage.ChangeApplier lg *zap.Logger statsCli stats.Client + stat *stat regionInfo *serverpb.RegionInfo clusterInfo discovery.ClusterInfoGetter isClosed bool replInfo *replInfo } +type stat struct { + ReplicationLag prometheus.Gauge +} + +func newStat() *stat { + repliacationLag := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "slave", + Name: "replication_lag", + Help: "replication lag of the slave", + }) + prometheus.MustRegister(repliacationLag) + return &stat{ + ReplicationLag: repliacationLag, + } +} // NewService creates a slave DKVService that periodically polls // for changes from master node and replicates them onto its local @@ -82,7 +99,7 @@ func NewService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger func newSlaveService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger, statsCli stats.Client, info *serverpb.RegionInfo, replConf *ReplicationConfig, clusterInfo discovery.ClusterInfoGetter) *slaveService { ri := &replInfo{replConfig: replConf} - ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli, + ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli, stat: newStat(), regionInfo: info, replInfo: ri, clusterInfo: clusterInfo} ss.findAndConnectToMaster() ss.startReplication() @@ -171,6 +188,7 @@ func (ss *slaveService) pollAndApplyChanges() { case <-ss.replInfo.replTckr.C: ss.lg.Info("Current replication lag", zap.Uint64("ReplicationLag", ss.replInfo.replLag)) ss.statsCli.Gauge("replication.lag", int64(ss.replInfo.replLag)) + ss.stat.ReplicationLag.Set(float64(ss.replInfo.replLag)) if err := ss.applyChangesFromMaster(ss.replInfo.replConfig.MaxNumChngs); err != nil { ss.lg.Error("Unable to retrieve changes from master", zap.Error(err)) if err := ss.replaceMasterIfInactive(); err != nil { diff --git a/internal/stats/models.go b/internal/stats/models.go new file mode 100644 index 00000000..a04668e1 --- /dev/null +++ b/internal/stats/models.go @@ -0,0 +1,113 @@ +package stats + +import ( + dto "github.com/prometheus/client_model/go" + "time" +) + +const ( + Ops = "ops" + Put = "put" + PutTTL = "pTtl" + Get = "get" + MultiGet = "mget" + Delete = "del" + GetSnapShot = "getSnapShot" + PutSnapShot = "putSnapShot" + Iterate = "iter" + CompareAndSet = "cas" + LoadChange = "loadChange" + SaveChange = "saveChange" +) + +type DKVMetrics struct { + TimeStamp int64 `json:"ts"` + StoreLatency map[string]*Percentile `json:"storage_latency"` + NexusLatency map[string]*Percentile `json:"nexus_latency"` + DKVLatency map[string]*Percentile `json:"dkv_latency"` + StorageOpsCount map[string]uint64 `json:"storage_ops_count"` + StorageOpsErrorCount map[string]float64 `json:"storage_ops_error_count"` + NexusOpsCount map[string]uint64 `json:"nexus_ops_count"` + DKVReqCount map[string]uint64 `json:"dkv_req_count"` + Count float64 `json:"count"` +} + +func (dm *DKVMetrics) Merge(dm1 DKVMetrics) { + dm.StoreLatency = MergeMapPercentile(dm.StoreLatency, dm1.StoreLatency, dm.Count) + dm.NexusLatency = MergeMapPercentile(dm.NexusLatency, dm1.NexusLatency, dm.Count) + dm.DKVLatency = MergeMapPercentile(dm.DKVLatency, dm1.DKVLatency, dm.Count) + dm.StorageOpsCount = MergeMapUint64(dm.StorageOpsCount, dm1.StorageOpsCount) + dm.StorageOpsErrorCount = MergeMapFloat64(dm.StorageOpsErrorCount, dm.StorageOpsErrorCount) + dm.NexusOpsCount = MergeMapUint64(dm.NexusOpsCount, dm1.NexusOpsCount) + dm.DKVReqCount = MergeMapUint64(dm.DKVReqCount, dm1.DKVReqCount) + dm.Count = dm.Count + 1 +} + +func MergeMapUint64(m1, m2 map[string]uint64) map[string]uint64 { + for k, v := range m2 { + if _, exist := m1[k]; exist { + m1[k] = m1[k] + v + } else { + m1[k] = v + } + } + return m1 +} + +func MergeMapFloat64(m1, m2 map[string]float64) map[string]float64 { + for k, v := range m2 { + if _, exist := m1[k]; exist { + m1[k] = m1[k] + v + } else { + m1[k] = v + } + } + return m1 +} + +func MergeMapPercentile(m1, m2 map[string]*Percentile, count float64) map[string]*Percentile { + for k, v := range m2 { + if _, exist := m1[k]; exist { + m1[k].P50 = (m1[k].P50*count + m2[k].P50) / (count + 1) + m1[k].P90 = (m1[k].P90*count + m2[k].P90) / (count + 1) + m1[k].P99 = (m1[k].P99*count + m2[k].P99) / (count + 1) + } else { + m1[k] = v + } + } + return m1 +} + +type Percentile struct { + P50 float64 `json:"p50"` + P90 float64 `json:"p90"` + P99 float64 `json:"p99"` +} + +func newDKVMetric() *DKVMetrics { + return &DKVMetrics{ + TimeStamp: time.Now().Unix(), + StoreLatency: make(map[string]*Percentile), + NexusLatency: make(map[string]*Percentile), + DKVLatency: make(map[string]*Percentile), + StorageOpsCount: make(map[string]uint64), + StorageOpsErrorCount: make(map[string]float64), + NexusOpsCount: make(map[string]uint64), + DKVReqCount: make(map[string]uint64), + Count: 1, + } +} +func newPercentile(quantile []*dto.Quantile) *Percentile { + percentile := &Percentile{} + for _, q := range quantile { + switch *q.Quantile { + case 0.5: + percentile.P50 = *q.Value + case 0.9: + percentile.P90 = *q.Value + case 0.99: + percentile.P99 = *q.Value + } + } + return percentile +} diff --git a/internal/stats/prometheus.go b/internal/stats/prometheus.go new file mode 100644 index 00000000..12b97720 --- /dev/null +++ b/internal/stats/prometheus.go @@ -0,0 +1,42 @@ +package stats + +import ( + "github.com/prometheus/client_golang/prometheus" + "time" +) + +func MeasureLatency(observer prometheus.Observer, startTime time.Time) { + observer.Observe(time.Since(startTime).Seconds()) +} + +func GetMetrics() (*DKVMetrics, error) { + dkvMetrics := newDKVMetric() + mfs, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return dkvMetrics, err + } + for _, mf := range mfs { + switch mf.GetName() { + case "storage_latency": + for _, m := range mf.GetMetric() { + dkvMetrics.StoreLatency[m.Label[0].GetValue()] = newPercentile(m.GetSummary().GetQuantile()) + dkvMetrics.StorageOpsCount[m.Label[0].GetValue()] = m.GetSummary().GetSampleCount() + } + case "nexus_latency": + for _, m := range mf.GetMetric() { + dkvMetrics.NexusLatency[m.Label[0].GetValue()] = newPercentile(m.GetSummary().GetQuantile()) + dkvMetrics.NexusOpsCount[m.Label[0].GetValue()] = m.GetSummary().GetSampleCount() + } + case "dkv_latency": + for _, m := range mf.GetMetric() { + dkvMetrics.DKVLatency[m.Label[0].GetValue()] = newPercentile(m.GetSummary().GetQuantile()) + dkvMetrics.DKVReqCount[m.Label[0].GetValue()] = m.GetSummary().GetSampleCount() + } + case "storage_error": + for _, m := range mf.GetMetric() { + dkvMetrics.StorageOpsErrorCount[m.Label[0].GetValue()] = m.GetCounter().GetValue() + } + } + } + return dkvMetrics, nil +} diff --git a/internal/stats/stat_aggregator.go b/internal/stats/stat_aggregator.go new file mode 100644 index 00000000..1b014d40 --- /dev/null +++ b/internal/stats/stat_aggregator.go @@ -0,0 +1,176 @@ +package stats + +import ( + "context" + "fmt" + "github.com/flipkart-incubator/dkv/pkg/serverpb" + "sync" + "time" +) + +var ( + MAX_STAT_BUFFER = 5 +) + +type StatAggregatorRegistry struct { + statsListener *StatListener + statAggregatorMap map[int64]*StatAggregator + /* Mutex for Safe Access */ + mapMutex sync.Mutex +} + +type MetricTag interface { + GetTag(region *serverpb.RegionInfo) string +} + +func NewStatAggregatorRegistry() *StatAggregatorRegistry { + statsListener := NewStatListener() + return &StatAggregatorRegistry{statsListener: statsListener, statAggregatorMap: make(map[int64]*StatAggregator)} +} + +func (sr *StatAggregatorRegistry) Register(regions []*serverpb.RegionInfo, tagger func(*serverpb.RegionInfo) string, outputChannel chan map[string]*DKVMetrics) int64 { + hostMap := map[string]string{} + for _, region := range regions { + hostMap[region.GetHttpAddress()] = tagger(region) + } + id := time.Now().UnixNano() + statAggregator := NewStatAggregator(outputChannel, hostMap) + go statAggregator.Start(sr.statsListener) + + sr.mapMutex.Lock() + defer sr.mapMutex.Unlock() + + sr.statAggregatorMap[id] = statAggregator + + return id +} + +func (sr *StatAggregatorRegistry) DeRegister(id int64) { + sr.mapMutex.Lock() + defer sr.mapMutex.Unlock() + if statAggregator, exist := sr.statAggregatorMap[id]; exist { + statAggregator.Stop() + delete(sr.statAggregatorMap, id) + } +} + +type StatAggregator struct { + outputChannel chan map[string]*DKVMetrics + aggregatedStatMap map[int64]map[string]*DKVMetrics + hostMap map[string]string + channelIds map[string]int64 + ctx context.Context + cancelFunc context.CancelFunc +} + +func NewStatAggregator(outputChannel chan map[string]*DKVMetrics, hostMap map[string]string) *StatAggregator { + ctx, cancelFunc := context.WithCancel(context.Background()) + return &StatAggregator{outputChannel: outputChannel, hostMap: hostMap, channelIds: map[string]int64{}, ctx: ctx, cancelFunc: cancelFunc} +} + +func (sa *StatAggregator) Start(listener *StatListener) { + sa.aggregatedStatMap = make(map[int64]map[string]*DKVMetrics, MAX_STAT_BUFFER) + + channels := make([]chan MetricEvent, 2) + for host, _ := range sa.hostMap { + channel := make(chan MetricEvent, MAX_STAT_BUFFER) + channelId, _ := listener.Register(host, channel) + sa.channelIds[host] = channelId + channels = append(channels, channel) + } + aggregatedEventChannel := sa.getMultiplexedChannel(channels) + for { + select { + case event := <-aggregatedEventChannel: + tag := sa.hostMap[event.host] + metric := event.metric + + /* ensuring that we have upper buffer size of 5 sec */ + if _, exist := sa.aggregatedStatMap[metric.TimeStamp]; !exist { + if len(sa.aggregatedStatMap) >= MAX_STAT_BUFFER { + index := getMinIndex(sa.aggregatedStatMap) + populateConsolidatedStat(sa.aggregatedStatMap[index]) + sa.outputChannel <- sa.aggregatedStatMap[index] + delete(sa.aggregatedStatMap, index) + } + sa.aggregatedStatMap[metric.TimeStamp] = make(map[string]*DKVMetrics) + } + + /* merging metrics*/ + if _, exist := sa.aggregatedStatMap[metric.TimeStamp][tag]; !exist { + metric.Count = 1 + sa.aggregatedStatMap[metric.TimeStamp][tag] = &metric + } else { + sa.aggregatedStatMap[metric.TimeStamp][tag].Merge(metric) + + } + + /* flushing when all metrics are aggregated */ + if getStatCount(sa.aggregatedStatMap[metric.TimeStamp]) == len(sa.hostMap) { + populateConsolidatedStat(sa.aggregatedStatMap[metric.TimeStamp]) + sa.outputChannel <- sa.aggregatedStatMap[metric.TimeStamp] + delete(sa.aggregatedStatMap, metric.TimeStamp) + } + + case <-sa.ctx.Done(): + for host, channelId := range sa.channelIds { + listener.DeRegister(host, channelId) + } + close(sa.outputChannel) + return + } + } +} + +func (sa *StatAggregator) Stop() { + sa.cancelFunc() +} + +func getStatCount(metricMap map[string]*DKVMetrics) int { + count := 0 + for _, metric := range metricMap { + count = count + int(metric.Count) + } + return count +} + +func getMinIndex(m map[int64]map[string]*DKVMetrics) int64 { + var min int64 + for index, _ := range m { + if min == 0 || min > index { + min = index + } + } + return min +} + +func populateConsolidatedStat(m map[string]*DKVMetrics) { + dkvMetrics := newDKVMetric() + for _, dm := range m { + /* it should contain ts of the metric that it is combining */ + dkvMetrics.TimeStamp = dm.TimeStamp + dkvMetrics.Merge(*dm) + } + m["global"] = dkvMetrics +} +func (sa *StatAggregator) getMultiplexedChannel(channels []chan MetricEvent) chan MetricEvent { + /* Channel to Write Multiplexed Events */ + aggregatedSseEvents := make(chan MetricEvent, MAX_STAT_BUFFER) + + /* Start all Multiplexing Go Routines with Context */ + for _, channel := range channels { + go func(evntChan chan MetricEvent) { + for { + select { + case <-sa.ctx.Done(): + fmt.Println("Context Signal Received Exiting Multiplexer Routine") + return + case event := <-evntChan: + /* Write received event onto aggregated channel */ + aggregatedSseEvents <- event + } + } + }(channel) + } + return aggregatedSseEvents +} diff --git a/internal/stats/stat_listener.go b/internal/stats/stat_listener.go new file mode 100644 index 00000000..77d76764 --- /dev/null +++ b/internal/stats/stat_listener.go @@ -0,0 +1,181 @@ +package stats + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "sync" + "time" +) + +type StatListener struct { + /* Time Stamp to Channel Map for Writing Output */ + listenerInfoMap map[string]*ListenerInfo + /* Mutex for Safe Access */ + mapMutex sync.Mutex +} + +type MetricEvent struct { + metric DKVMetrics + host string +} + +func NewStatListener() *StatListener { + return &StatListener{listenerInfoMap: make(map[string]*ListenerInfo)} +} + +func (sl *StatListener) Register(host string, outputChannel chan MetricEvent) (int64, error) { + sl.mapMutex.Lock() + defer sl.mapMutex.Unlock() + if _, exists := sl.listenerInfoMap[host]; !exists { + newListenerInfo := NewListenerInfo(host) + if err := newListenerInfo.Start(); err != nil { + return 0, err + } + sl.listenerInfoMap[host] = newListenerInfo + } + return sl.listenerInfoMap[host].Register(outputChannel), nil +} + +func (sl *StatListener) DeRegister(host string, id int64) { + sl.mapMutex.Lock() + if li, exists := sl.listenerInfoMap[host]; exists { + li.DeRegister(id) + if li.GetChannelCount() == 0 { + li.Stop() + delete(sl.listenerInfoMap, host) + } + } + defer sl.mapMutex.Unlock() +} + +type ListenerInfo struct { + host string + outputChannelMap map[int64]chan MetricEvent + inputChannel <-chan MetricEvent + mapMutex sync.Mutex + ctx context.Context + cancelFunc context.CancelFunc +} + +func NewListenerInfo(host string) *ListenerInfo { + ctx, cancelFunc := context.WithCancel(context.Background()) + return &ListenerInfo{host: host, outputChannelMap: make(map[int64]chan MetricEvent, 2), inputChannel: make(<-chan MetricEvent, 5), ctx: ctx, cancelFunc: cancelFunc} +} + +func (li *ListenerInfo) GetChannelCount() int { + return len(li.outputChannelMap) +} + +func (li *ListenerInfo) Start() error { + var err error + if li.inputChannel, err = getStreamChannel(li.host, li.ctx); err == nil { + go li.BroadCast() + } + return err +} + +func (li *ListenerInfo) BroadCast() { + for { + select { + case e := <-li.inputChannel: + li.mapMutex.Lock() + for _, outputChan := range li.outputChannelMap { + outputChan <- e + } + li.mapMutex.Unlock() + case <-li.ctx.Done(): + li.mapMutex.Lock() + for _, outputChan := range li.outputChannelMap { + close(outputChan) + } + li.mapMutex.Unlock() + } + } +} + +func getStreamChannel(host string, ctx context.Context) (<-chan MetricEvent, error) { + + client := &http.Client{} + transport := &http.Transport{} + transport.DisableCompression = true + client.Transport = transport + request, err := http.NewRequest("GET", "http://"+host+"/metrics/stream", nil) + if err != nil { + return nil, err + } + /* Add Header to accept streaming events */ + request.Header.Set("Accept", "text/event-stream") + /* Make Channel to Report Events */ + eventChannel := make(chan MetricEvent, 5) + /* Ensure Request gets Cancelled along with Context */ + requestWithContext := request.WithContext(ctx) + /* Fire Request */ + if response, err := client.Do(requestWithContext); err == nil { + /* Open a Reader on Response Body */ + go parseEvent(response, eventChannel, host, ctx) + } else { + return nil, err + } + return eventChannel, nil +} + +func parseEvent(response *http.Response, eventChannel chan MetricEvent, host string, ctx context.Context) { + defer response.Body.Close() + br := bufio.NewReader(response.Body) + for { + select { + case <-ctx.Done(): + close(eventChannel) + return + default: + /* Read Lines Upto Delimiter */ + if readBytes, err := br.ReadBytes('\n'); err == nil { + if event, err := buildEvent(readBytes); err == nil { + eventChannel <- MetricEvent{metric: *event, host: host} + } + } + } + } +} + +func buildEvent(byts []byte) (*DKVMetrics, error) { + splits := bytes.Split(byts, []byte{':', ' '}) + if len(splits) == 2 { + dkvMetrics := &DKVMetrics{} + err := json.Unmarshal(splits[1], dkvMetrics) + return dkvMetrics, err + } + return nil, errors.New("Invalid Response") +} + +func (li *ListenerInfo) Stop() { + li.cancelFunc() +} + +func (li *ListenerInfo) Register(outputChannel chan MetricEvent) int64 { + channelId := time.Now().UnixNano() + li.mapMutex.Lock() + li.outputChannelMap[channelId] = outputChannel + li.mapMutex.Unlock() + return channelId +} + +func (li *ListenerInfo) DeRegister(id int64) { + li.mapMutex.Lock() + if outputChannel, ok := li.outputChannelMap[id]; ok { + li.unsafeDeregister(outputChannel, id) + } + li.mapMutex.Unlock() +} + +func (li *ListenerInfo) unsafeDeregister(outputChannel chan MetricEvent, id int64) { + /* Close Channel */ + close(outputChannel) + /* Delete current Channel from Broadcast Map */ + delete(li.outputChannelMap, id) +} diff --git a/internal/stats/stat_publisher.go b/internal/stats/stat_publisher.go new file mode 100644 index 00000000..cba2ca28 --- /dev/null +++ b/internal/stats/stat_publisher.go @@ -0,0 +1,61 @@ +package stats + +import ( + "sync" + "time" +) + +type StatPublisher struct { + /* Time Stamp to Channel Map for Writing Output */ + outputChannelMap map[int64]chan DKVMetrics + /* Mutex for Safe Access */ + mapMutex sync.Mutex +} + +func NewStatPublisher() *StatPublisher { + return &StatPublisher{ + outputChannelMap: make(map[int64]chan DKVMetrics, 10), + } +} + +func (sp *StatPublisher) Register(outputChannel chan DKVMetrics) int64 { + channelId := time.Now().UnixNano() + sp.mapMutex.Lock() + sp.outputChannelMap[channelId] = outputChannel + sp.mapMutex.Unlock() + return channelId +} + +func (sp *StatPublisher) DeRegister(id int64) { + sp.mapMutex.Lock() + if outputChannel, ok := sp.outputChannelMap[id]; ok { + sp.unsafeDeregister(outputChannel, id) + } + sp.mapMutex.Unlock() +} + +func (sp *StatPublisher) unsafeDeregister(outputChannel chan DKVMetrics, id int64) { + /* Close Channel */ + close(outputChannel) + /* Delete current Channel from Broadcast Map */ + delete(sp.outputChannelMap, id) +} + +func (sp *StatPublisher) Run() { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + dkvMetrics, _ := GetMetrics() + sp.mapMutex.Lock() + for id, outputChannel := range sp.outputChannelMap { + select { + case outputChannel <- *dkvMetrics: + default: + sp.unsafeDeregister(outputChannel, id) + } + } + sp.mapMutex.Unlock() + } + } +} diff --git a/internal/storage/badger/store.go b/internal/storage/badger/store.go index 6337a80e..90fa64ff 100644 --- a/internal/storage/badger/store.go +++ b/internal/storage/badger/store.go @@ -39,6 +39,7 @@ type DB interface { type badgerDB struct { db *badger.DB opts *bdgrOpts + stat *storage.Stat // Indicates a global mutation like backup and restore that // require exclusivity. Shall be manipulated using atomics. @@ -181,7 +182,7 @@ func openStore(bdbOpts *bdgrOpts) (*badgerDB, error) { if err != nil { return nil, err } - return &badgerDB{db, bdbOpts, 0}, nil + return &badgerDB{db, bdbOpts, storage.NewStat(), 0}, nil } func (bdb *badgerDB) Close() error { @@ -190,6 +191,7 @@ func (bdb *badgerDB) Close() error { } func (bdb *badgerDB) Put(pairs ...*serverpb.KVPair) error { + /* todo stat computation */ metricsPrefix := "badger.put.multi" if len(pairs) == 1 { metricsPrefix = "badger.put.single" @@ -220,17 +222,22 @@ func (bdb *badgerDB) Put(pairs ...*serverpb.KVPair) error { func (bdb *badgerDB) Delete(key []byte) error { defer bdb.opts.statsCli.Timing("badger.delete.latency.ms", time.Now()) + defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.Delete), time.Now()) + err := bdb.db.Update(func(txn *badger.Txn) error { return txn.Delete(key) }) if err != nil { bdb.opts.statsCli.Incr("badger.delete.errors", 1) + bdb.stat.ResponseError.WithLabelValues(stats.Delete).Inc() } return err } func (bdb *badgerDB) Get(keys ...[]byte) ([]*serverpb.KVPair, error) { defer bdb.opts.statsCli.Timing("badger.get.latency.ms", time.Now()) + defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.Get), time.Now()) + var results []*serverpb.KVPair err := bdb.db.View(func(txn *badger.Txn) error { for _, key := range keys { @@ -249,12 +256,15 @@ func (bdb *badgerDB) Get(keys ...[]byte) ([]*serverpb.KVPair, error) { }) if err != nil { bdb.opts.statsCli.Incr("badger.get.errors", 1) + bdb.stat.ResponseError.WithLabelValues(stats.Get).Inc() } return results, err } func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) { defer bdb.opts.statsCli.Timing("badger.cas.latency.ms", time.Now()) + defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.CompareAndSet), time.Now()) + casTrxn := bdb.db.NewTransaction(true) defer casTrxn.Discard() @@ -266,6 +276,7 @@ func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) { } case err != nil: bdb.opts.statsCli.Incr("badger.cas.get.errors", 1) + bdb.stat.ResponseError.WithLabelValues(stats.CompareAndSet).Inc() return false, err default: existVal, _ := exist.ValueCopy(nil) @@ -276,6 +287,7 @@ func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) { err = casTrxn.Set(key, update) if err != nil { bdb.opts.statsCli.Incr("badger.cas.set.errors", 1) + bdb.stat.ResponseError.WithLabelValues(stats.CompareAndSet).Inc() return false, err } err = casTrxn.Commit() @@ -291,6 +303,7 @@ const ( func (bdb *badgerDB) GetSnapshot() (io.ReadCloser, error) { defer bdb.opts.statsCli.Timing("badger.snapshot.get.latency.ms", time.Now()) + defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.GetSnapShot), time.Now()) sstFile, err := storage.CreateTempFile(bdb.opts.sstDirectory, badgerSSTPrefix) if err != nil { @@ -324,6 +337,7 @@ func (bdb *badgerDB) GetSnapshot() (io.ReadCloser, error) { func (bdb *badgerDB) PutSnapshot(snap io.ReadCloser) error { defer bdb.opts.statsCli.Timing("badger.snapshot.put.latency.ms", time.Now()) + defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.PutSnapShot), time.Now()) wb := bdb.db.NewWriteBatch() defer wb.Cancel() @@ -482,6 +496,8 @@ func (bdb *badgerDB) GetLatestAppliedChangeNumber() (uint64, error) { func (bdb *badgerDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error) { defer bdb.opts.statsCli.Timing("badger.save.changes.latency.ms", time.Now()) + defer stats.MeasureLatency(bdb.stat.RequestLatency.WithLabelValues(stats.SaveChange), time.Now()) + var appldChngNum uint64 var lastErr error diff --git a/internal/storage/rocksdb/store.go b/internal/storage/rocksdb/store.go index 8dfb2b34..2365ba18 100644 --- a/internal/storage/rocksdb/store.go +++ b/internal/storage/rocksdb/store.go @@ -41,6 +41,7 @@ type rocksDB struct { ttlCF *gorocksdb.ColumnFamilyHandle optimTrxnDB *gorocksdb.OptimisticTransactionDB opts *rocksDBOpts + stat *storage.Stat // Indicates a global mutation like backup and restore that // require exclusivity. Shall be manipulated using atomics. @@ -221,6 +222,7 @@ func openStore(opts *rocksDBOpts) (*rocksDB, error) { optimTrxnDB: optimTrxnDB, opts: opts, globalMutation: 0, + stat: storage.NewStat(), } //TODO: revisit this later after understanding what is the impact of manually triggered compaction //go rocksdb.Compaction() @@ -323,6 +325,8 @@ func (rdb *rocksDB) Put(pairs ...*serverpb.KVPair) error { func (rdb *rocksDB) Delete(key []byte) error { defer rdb.opts.statsCli.Timing("rocksdb.delete.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Delete), time.Now()) + wb := gorocksdb.NewWriteBatch() defer wb.Destroy() wb.DeleteCF(rdb.ttlCF, key) @@ -330,6 +334,7 @@ func (rdb *rocksDB) Delete(key []byte) error { err := rdb.db.Write(rdb.opts.writeOpts, wb) if err != nil { rdb.opts.statsCli.Incr("rocksdb.delete.errors", 1) + rdb.stat.ResponseError.WithLabelValues(stats.Delete).Inc() } return err } @@ -346,6 +351,8 @@ func (rdb *rocksDB) Get(keys ...[]byte) ([]*serverpb.KVPair, error) { func (rdb *rocksDB) CompareAndSet(key, expect, update []byte) (bool, error) { defer rdb.opts.statsCli.Timing("rocksdb.cas.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.CompareAndSet), time.Now()) + ro := rdb.opts.readOpts wo := rdb.opts.writeOpts to := gorocksdb.NewDefaultOptimisticTransactionOptions() @@ -371,6 +378,7 @@ func (rdb *rocksDB) CompareAndSet(key, expect, update []byte) (bool, error) { err = txn.Put(key, update) if err != nil { rdb.opts.statsCli.Incr("rocksdb.cas.set.errors", 1) + rdb.stat.ResponseError.WithLabelValues(stats.CompareAndSet).Inc() return false, err } err = txn.Commit() @@ -442,6 +450,7 @@ func (r *checkPointSnapshot) Close() error { func (rdb *rocksDB) GetSnapshot() (io.ReadCloser, error) { defer rdb.opts.statsCli.Timing("rocksdb.snapshot.get.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.GetSnapShot), time.Now()) //Prevent any other backups or restores err := rdb.beginGlobalMutation() @@ -505,6 +514,7 @@ func (rdb *rocksDB) PutSnapshot(snap io.ReadCloser) error { return nil } defer rdb.opts.statsCli.Timing("rocksdb.snapshot.put.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.PutSnapShot), time.Now()) defer snap.Close() //Prevent any other backups or restores @@ -614,6 +624,8 @@ func (rdb *rocksDB) GetLatestCommittedChangeNumber() (uint64, error) { func (rdb *rocksDB) LoadChanges(fromChangeNumber uint64, maxChanges int) ([]*serverpb.ChangeRecord, error) { defer rdb.opts.statsCli.Timing("rocksdb.load.changes.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.LoadChange), time.Now()) + chngIter, err := rdb.db.GetUpdatesSince(fromChangeNumber) if err != nil { return nil, err @@ -636,6 +648,8 @@ func (rdb *rocksDB) GetLatestAppliedChangeNumber() (uint64, error) { func (rdb *rocksDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error) { defer rdb.opts.statsCli.Timing("rocksdb.save.changes.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.SaveChange), time.Now()) + appldChngNum := uint64(0) for _, chng := range changes { wb := gorocksdb.WriteBatchFrom(chng.SerialisedForm) @@ -804,9 +818,12 @@ func parseTTLMsgPackData(valueWithTTL []byte) (*ttlDataFormat, error) { func (rdb *rocksDB) getSingleKey(ro *gorocksdb.ReadOptions, key []byte) ([]*serverpb.KVPair, error) { defer rdb.opts.statsCli.Timing("rocksdb.single.get.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Get), time.Now()) + values, err := rdb.db.MultiGetCFMultiCF(ro, []*gorocksdb.ColumnFamilyHandle{rdb.normalCF, rdb.ttlCF}, [][]byte{key, key}) if err != nil { rdb.opts.statsCli.Incr("rocksdb.single.get.errors", 1) + rdb.stat.ResponseError.WithLabelValues(stats.Get).Inc() return nil, err } value1, value2 := values[0], values[1] @@ -849,6 +866,7 @@ func (rdb *rocksDB) extractResult(value1 *gorocksdb.Slice, value2 *gorocksdb.Sli func (rdb *rocksDB) getMultipleKeys(ro *gorocksdb.ReadOptions, keys [][]byte) ([]*serverpb.KVPair, error) { defer rdb.opts.statsCli.Timing("rocksdb.multi.get.latency.ms", time.Now()) + defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.MultiGet), time.Now()) kl := len(keys) reqCFs := make([]*gorocksdb.ColumnFamilyHandle, kl<<1) @@ -860,6 +878,7 @@ func (rdb *rocksDB) getMultipleKeys(ro *gorocksdb.ReadOptions, keys [][]byte) ([ values, err := rdb.db.MultiGetCFMultiCF(ro, reqCFs, append(keys, keys...)) if err != nil { rdb.opts.statsCli.Incr("rocksdb.multi.get.errors", 1) + rdb.stat.ResponseError.WithLabelValues(stats.MultiGet).Inc() return nil, err } diff --git a/internal/storage/store.go b/internal/storage/store.go index f9e3b072..bbe4e804 100644 --- a/internal/storage/store.go +++ b/internal/storage/store.go @@ -1,6 +1,8 @@ package storage import ( + "github.com/flipkart-incubator/dkv/internal/stats" + "github.com/prometheus/client_golang/prometheus" "io" "io/ioutil" "os" @@ -9,6 +11,28 @@ import ( "github.com/flipkart-incubator/dkv/pkg/serverpb" ) +type Stat struct { + RequestLatency *prometheus.SummaryVec + ResponseError *prometheus.CounterVec +} + +func NewStat() *Stat { + RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "storage", + Name: "latency", + Help: "Latency statistics for storage operations", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + MaxAge: 10 * time.Second, + }, []string{stats.Ops}) + ResponseError := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "storage", + Name: "error", + Help: "Error count for storage operations", + }, []string{stats.Ops}) + prometheus.MustRegister(RequestLatency, ResponseError) + return &Stat{RequestLatency, ResponseError} +} + // A KVStore represents the key value store that provides // the underlying storage implementation for the various // DKV operations. diff --git a/internal/www/css/main.css b/internal/www/css/main.css new file mode 100644 index 00000000..54945c61 --- /dev/null +++ b/internal/www/css/main.css @@ -0,0 +1,9 @@ +.chart.c3{ + width:33%; + float: left; +} + +.font-bold { + font-weight: bold; + font-size: 14px; +} diff --git a/internal/www/header-nav.html b/internal/www/header-nav.html new file mode 100644 index 00000000..525baf51 --- /dev/null +++ b/internal/www/header-nav.html @@ -0,0 +1,7 @@ + \ No newline at end of file diff --git a/internal/www/home.html b/internal/www/home.html new file mode 100644 index 00000000..e64a8f8f --- /dev/null +++ b/internal/www/home.html @@ -0,0 +1,66 @@ + + +
+
+
+
+

{{througputParser(getMapValSum(stat["dkv_req_count"]))}}

+
+
+
+
+
+
+
+
+
{{op}}

+

{{ througputParser(stat["dkv_req_count"][op]) }} rps

+
+ + + + + + + +
p50 {{latencyParser(stat["dkv_latency"][op].p50)}}
p90 {{latencyParser(stat["dkv_latency"][op].p90)}}
p99 {{latencyParser(stat["dkv_latency"][op].p99)}}
+
+
+
+
+
+
{{op}}

+

{{ througputParser(stat["dkv_req_count"][op]) }} rps

+
+ + + + + + + +
p50 {{latencyParser(stat["dkv_latency"][op].p50)}}
p90 {{latencyParser(stat["dkv_latency"][op].p90)}}
p99 {{latencyParser(stat["dkv_latency"][op].p99)}}
+
+
+
+
+
+
+
+
+ +
+
+ +
+
+ +
+
+ +
+
+
+
{{j}}
+
+
diff --git a/internal/www/index.html b/internal/www/index.html new file mode 100644 index 00000000..728567f5 --- /dev/null +++ b/internal/www/index.html @@ -0,0 +1,67 @@ + + + + DKV Dashboard + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + \ No newline at end of file diff --git a/internal/www/js/config.js b/internal/www/js/config.js new file mode 100644 index 00000000..2a41e88d --- /dev/null +++ b/internal/www/js/config.js @@ -0,0 +1,9 @@ +CONFIG={ + "router": { + "Home":{ + "path":"/", + "controller":"HomeCtrl", + "templateUrl": "home.html", + }, + }, +}; diff --git a/internal/www/js/controllers.js b/internal/www/js/controllers.js new file mode 100644 index 00000000..c0c06952 --- /dev/null +++ b/internal/www/js/controllers.js @@ -0,0 +1,170 @@ +(function() { + + + angular.module("DKV.Dashboard.Controllers", ['DKV.Dashboard.Services','chart.js', 'dataGrid', 'pagination', 'ngCookies' ,'ngclipboard','mgcrea.ngStrap','ngSanitize']) + + .config(['ChartJsProvider', function (ChartJsProvider) { + ChartJsProvider.setOptions({ + responsive: false + }); + ChartJsProvider.setOptions('line', { + showLines: true, + animation: { + duration:0 + }, + legend: { + display: true, + position: 'bottom', + labels:{ + boxWidth:20 + } + }, + scales: { + yAxes: [{ + ticks: { + beginAtZero:true + }, + gridLines: { + display:false + } + }], + xAxes: [{ + type: 'time', + ticks: { + maxRotation: 0, + minRotation: 0, + autoSkip: true, + maxTickLimit: 3 + }, + gridLines: { + display:false + } + }] + }, + tooltips: { + intersect:false, + callbacks : { + title: function(tooltipItems, data) { + return tooltipItems[0].xLabel.toLocaleTimeString(); + } + } + }, + elements: { + line: { + fill: false, + borderWidth: 1 + }, + point: { + radius: 1 + } + + } + }); + }]) + + + .controller("HomeCtrl", [ '$location', '$scope', '$rootScope','$timeout','$sce','DKVService', + function($location, $scope, $rootScope ,$timeout, $sce, DKVService) { + /* discover master nodes from here*/ + $scope.masters = { + "m1":"http://127.0.0.1:7081", + "m2":"http://127.0.0.1:7082", + "m3":"http://127.0.0.1:7083", + } + + $scope.througputParser = througputParser + $scope.latencyParser = latencyParser + $scope.getMapValSum = getMapValSum + + $scope.ops1 = ["getLin","getSeq","mgetLin","mgetSeq",] + $scope.ops2 = ["put","mput","cas","del"] + + $scope.source = new EventSource(DKVService.GetClusterData($scope.masters.m1)); + $scope.source.addEventListener('message', function(e) { + data = JSON.parse(e.data) + $scope.stat = data["global"] + delete data["global"] + $scope.stats = data + /* initialize after the first population */ + if ( $scope.series === undefined ) { + initMetric() + } + + console.log(new Date($scope.stat.ts)) + console.log(new Date()) + $scope.tsEvent.push(new Date($scope.stat.ts)); + + for ( i = 0 ; i < $scope.series.length; i++ ) { + $scope.ts.rrate.data[i].push(getMapValSum($scope.stats[$scope.series[i]]["dkv_req_count"])) + $scope.ts.error.data[i].push(getMapValSum($scope.stats[$scope.series[i]]["dkv_req_count"]) / 100) + avgLatency = getAvgLatency($scope.stats[$scope.series[i]]["dkv_latency"]) + $scope.ts.p50.data[i].push(avgLatency.p50) + $scope.ts.p99.data[i].push(avgLatency.p99) + } + + if ( $scope.tsEvent.length > 60 ) { + $scope.tsEvent.splice(0,1); + for ( i = 0 ; i < $scope.series.length; i++ ) { + $scope.ts.rrate.data[i].splice(0,1); + $scope.ts.error.data[i].splice(0,1); + $scope.ts.p50.data[i].splice(0,1); + $scope.ts.p99.data[i].splice(0,1); + } + } + + }); + + function createStat(parser,titleText) { + var stat = { series : $scope.series, colors: $scope.colors, label: $scope.tsEvent, data : getEmptyArray($scope.series.length), + options: { + responsive: true, + maintainAspectRatio: false, + scales: { + yAxes: [{ + ticks: { + callback: function (value, index, values) { + return parser(value) + } + }, + }] + }, + title: { + display: true, text: titleText, + fontSize: 13, fontColor: "#000000" + }, + legend: { + display: false + }, + elements: { + line: { + fill: false, + borderWidth: 1.7 + }, + point: { + radius: 0 + } + + } + } + }; + return stat + } + function initMetric() { + $scope.ts = {} + $scope.tsEvent = [] + $scope.series = Object.keys($scope.stats) + $scope.colors = [colorGreen,colorOrange,colorBlue,colorRed,colorViolet,colorDeepBlue] + + $scope.ts.rrate = createStat(througputParser,"request rate") + $scope.ts.error = createStat(percentageParser,"error rate") + $scope.ts.p50 = createStat(latencyParser,"50th percentile") + $scope.ts.p99 = createStat(latencyParser,"99th percentile") + } + + window.setInterval(function(){ + $scope.$apply(function () { + }); + }, 1000); + } + ]) +}()); diff --git a/internal/www/js/directives.js b/internal/www/js/directives.js new file mode 100644 index 00000000..104cc171 --- /dev/null +++ b/internal/www/js/directives.js @@ -0,0 +1,13 @@ +(function() { + + angular.module("DKV.Dashboard.Directives", ["DKV.Dashboard.Controllers"]) + .directive("headerNav", function() { + return { + restrict: "E", + templateUrl: "header-nav.html", + controller: function($scope, $rootScope, $location, $sce) { + $scope.config = CONFIG; + } + }; + }) +}()); diff --git a/internal/www/js/lib.js b/internal/www/js/lib.js new file mode 100644 index 00000000..85feb6a2 --- /dev/null +++ b/internal/www/js/lib.js @@ -0,0 +1,75 @@ +var througputParser = function (d) { + if ( d === undefined ) { + return 0 + } + var sizes = ['', 'K', 'M', 'B', 'T']; + if (d < 1) return d.toFixed(1); + var i = Math.floor(Math.log(d) / Math.log(1000)); + var base = d / Math.pow(1000, i); + if ( Math.round(base) === base ){ + return base + ' ' + sizes[i] + } + return base.toFixed(1) + ' ' + sizes[i]; +}; + +var latencyParser = function (d) { + if ( d === undefined ) { + return 0 + } + d = d * 1000000 + var sizes = ['µs', 'ms', 's']; + var i = Math.floor(Math.log(d) / Math.log(1000)); + var base = d / Math.pow(1000, i); + if ( Math.round(base) === base ){ + return base + ' ' + sizes[i] + } + return base.toFixed(1) + ' ' + sizes[i]; +}; + +var getMapValSum = function (d) { + let sum = 0; + for (let key in d) { + sum += d[key]; + } + return sum +} + +var getAvgLatency = function (d) { + latency = { p50: 0 , p90 : 0 , p99 : 0} + + console.log(d) + for (let key in d ) { + latency.p50 += d[key].p50 + latency.p90 += d[key].p90 + latency.p99 += d[key].p99 + } + console.log(latency) + + count = Object.keys(d).length + latency.p50 /= count + latency.p90 /= count + latency.p99 /= count + + console.log(latency) + return latency +} + +var percentageParser = function (d) { + if (Math.round(d) === d) return Math.round(d)+" %"; + return d.toFixed(2)+" %"; +} + +function getEmptyArray(size){ + var data = []; + for (i = 0; i < size; i++) { + data.push([]); + } + return data; +} +var colorGreen = {backgroundColor: '#62a043', borderColor: '#62a043', hoverBackgroundColor: '#62a043', hoverBorderColor: '#62a043'}; +var colorOrange = {backgroundColor: '#dc923f', borderColor: '#dc923f', hoverBackgroundColor: '#dc923f', hoverBorderColor: '#dc923f'}; +var colorBlue = {backgroundColor: '#0a9bdc', borderColor: '#0a9bdc', hoverBackgroundColor: '#0a9bdc', hoverBorderColor: '#0a9bdc'}; +var colorRed = {backgroundColor: '#bc4040', borderColor: '#bc4040', hoverBackgroundColor: '#bc4040', hoverBorderColor: '#bc4040'}; +var colorYellow = {backgroundColor: '#ffe50c', borderColor: '#ffe50c', hoverBackgroundColor: '#ffe50c', hoverBorderColor: '#ffe50c'}; +var colorViolet = {backgroundColor: '#9263ff', borderColor: '#9263ff', hoverBackgroundColor: '#9263ff', hoverBorderColor: '#9263ff'}; +var colorDeepBlue = {backgroundColor: '#0001ff', borderColor: '#0001ff', hoverBackgroundColor: '#0001ff', hoverBorderColor: '#0001ff'}; diff --git a/internal/www/js/main.js b/internal/www/js/main.js new file mode 100644 index 00000000..75ab49d5 --- /dev/null +++ b/internal/www/js/main.js @@ -0,0 +1,25 @@ +(function() { + + angular.module("DKV.Dashboard",['ngRoute', 'DKV.Dashboard.Services', 'DKV.Dashboard.Controllers', 'DKV.Dashboard.Directives']) + + /** routes */ + .config( function($routeProvider,$locationProvider) { + for (id in CONFIG.router) { + $routeProvider.when(CONFIG.router[id].path, { + templateUrl: CONFIG.router[id].templateUrl, + controller: CONFIG.router[id].controller + }); + } + $routeProvider.otherwise({ + redirectTo: "/" + }); + $locationProvider.html5Mode(true); + }) + + .run(['$rootScope', '$location', '$anchorScroll', function($rootScope, $location, $anchorScroll) { + $rootScope.$on('$routeChangeSuccess', function(newRoute, oldRoute) { + if ($location.hash()) $anchorScroll(); + }); + }]); + +}()); diff --git a/internal/www/js/services.js b/internal/www/js/services.js new file mode 100644 index 00000000..31b81464 --- /dev/null +++ b/internal/www/js/services.js @@ -0,0 +1,12 @@ +(function() { + angular.module("DKV.Dashboard.Services", []) + + .factory("DKVService", [ '$http','$sce', function($http,$sce) { + return { + GetClusterData: function(endpoint) { + return $sce.trustAsResourceUrl(endpoint+"/metrics/cluster") + }, + } + }]) +}()); + diff --git a/internal/www/run.sh b/internal/www/run.sh new file mode 100644 index 00000000..f4a4c5fb --- /dev/null +++ b/internal/www/run.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +python -m SimpleHTTPServer 8989 \ No newline at end of file diff --git a/pkg/serverpb/admin.pb.go b/pkg/serverpb/admin.pb.go index b07244b1..453da423 100644 --- a/pkg/serverpb/admin.pb.go +++ b/pkg/serverpb/admin.pb.go @@ -1039,6 +1039,9 @@ type RegionInfo struct { // Nexus cluster url of the region // Will be used by new followers to discover the raft cluster NexusClusterUrl *string `protobuf:"bytes,7,opt,name=nexusClusterUrl,proto3,oneof" json:"nexusClusterUrl,omitempty"` + // http listener of the node. + // http endpoint is useful for admin API interactions + HttpAddress string `protobuf:"bytes,8,opt,name=httpAddress,proto3" json:"httpAddress,omitempty"` } func (x *RegionInfo) Reset() { @@ -1122,6 +1125,13 @@ func (x *RegionInfo) GetNexusClusterUrl() string { return "" } +func (x *RegionInfo) GetHttpAddress() string { + if x != nil { + return x.HttpAddress + } + return "" +} + var File_pkg_serverpb_admin_proto protoreflect.FileDescriptor var file_pkg_serverpb_admin_proto_rawDesc = []byte{ @@ -1235,7 +1245,7 @@ var file_pkg_serverpb_admin_proto_rawDesc = []byte{ 0x12, 0x3a, 0x0a, 0x0b, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, - 0x0b, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x22, 0xa3, 0x02, 0x0a, + 0x0b, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x22, 0xc5, 0x02, 0x0a, 0x0a, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x63, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x63, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, @@ -1251,78 +1261,80 @@ var file_pkg_serverpb_admin_proto_rawDesc = []byte{ 0x00, 0x52, 0x0a, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, 0x88, 0x01, 0x01, 0x12, 0x2d, 0x0a, 0x0f, 0x6e, 0x65, 0x78, 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0f, 0x6e, 0x65, 0x78, - 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x88, 0x01, 0x01, 0x42, - 0x0d, 0x0a, 0x0b, 0x5f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, 0x42, 0x12, - 0x0a, 0x10, 0x5f, 0x6e, 0x65, 0x78, 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x55, - 0x72, 0x6c, 0x2a, 0x68, 0x0a, 0x0c, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x00, - 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x45, 0x41, 0x44, 0x45, 0x52, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, - 0x50, 0x52, 0x49, 0x4d, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57, 0x45, 0x52, - 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x45, 0x43, 0x4f, 0x4e, 0x44, 0x41, 0x52, 0x59, 0x5f, - 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57, 0x45, 0x52, 0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c, 0x41, 0x43, - 0x54, 0x49, 0x56, 0x45, 0x5f, 0x53, 0x4c, 0x41, 0x56, 0x45, 0x10, 0x04, 0x32, 0xae, 0x02, 0x0a, - 0x0e, 0x44, 0x4b, 0x56, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, - 0x4f, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x12, 0x1f, 0x2e, - 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, - 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, - 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, - 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x39, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x15, + 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x88, 0x01, 0x01, 0x12, + 0x20, 0x0a, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x68, 0x74, 0x74, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, + 0x73, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, + 0x42, 0x12, 0x0a, 0x10, 0x5f, 0x6e, 0x65, 0x78, 0x75, 0x73, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x55, 0x72, 0x6c, 0x2a, 0x68, 0x0a, 0x0c, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, + 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x45, 0x41, 0x44, 0x45, 0x52, 0x10, 0x01, 0x12, 0x14, + 0x0a, 0x10, 0x50, 0x52, 0x49, 0x4d, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57, + 0x45, 0x52, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x45, 0x43, 0x4f, 0x4e, 0x44, 0x41, 0x52, + 0x59, 0x5f, 0x46, 0x4f, 0x4c, 0x4c, 0x4f, 0x57, 0x45, 0x52, 0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c, + 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x5f, 0x53, 0x4c, 0x41, 0x56, 0x45, 0x10, 0x04, 0x32, 0xae, + 0x02, 0x0a, 0x0e, 0x44, 0x4b, 0x56, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x4f, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x12, + 0x1f, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, + 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x20, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, + 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x12, 0x15, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, + 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3c, 0x0a, + 0x0d, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x15, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3c, 0x0a, 0x0d, 0x52, - 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x12, 0x15, 0x2e, 0x64, - 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x70, 0x6c, - 0x69, 0x63, 0x61, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x52, 0x0a, 0x0b, 0x47, 0x65, 0x74, - 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x20, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x69, - 0x63, 0x61, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x64, 0x6b, 0x76, + 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x52, 0x0a, 0x0b, 0x47, + 0x65, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x20, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8e, 0x01, - 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x74, 0x6f, - 0x72, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x12, 0x1b, 0x2e, 0x64, - 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x63, 0x6b, - 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, - 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, - 0x3d, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x1c, 0x2e, 0x64, 0x6b, 0x76, - 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0xd6, - 0x01, 0x0a, 0x0a, 0x44, 0x4b, 0x56, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x3d, 0x0a, - 0x07, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1c, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x43, 0x0a, 0x0a, - 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x2e, 0x64, 0x6b, 0x76, - 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, - 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, - 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x12, 0x44, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x12, 0x16, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1f, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xb4, 0x01, 0x0a, 0x0c, 0x44, 0x4b, 0x56, 0x44, - 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, 0x47, 0x0a, 0x0c, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, + 0x6c, 0x69, 0x63, 0x61, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x64, + 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, + 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, + 0x8e, 0x01, 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x12, 0x1b, + 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x42, 0x61, + 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x12, 0x5b, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, - 0x6e, 0x66, 0x6f, 0x12, 0x23, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, - 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x51, - 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x4e, 0x6f, - 0x64, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, - 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, - 0x6f, 0x42, 0x30, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x66, 0x6c, 0x69, 0x70, 0x6b, 0x61, 0x72, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x75, 0x62, 0x61, 0x74, - 0x6f, 0x72, 0x2f, 0x64, 0x6b, 0x76, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x12, 0x3d, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x1c, 0x2e, 0x64, + 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x32, 0xd6, 0x01, 0x0a, 0x0a, 0x44, 0x4b, 0x56, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, + 0x3d, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1c, 0x2e, 0x64, 0x6b, 0x76, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x41, 0x64, 0x64, 0x4e, 0x6f, 0x64, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x43, + 0x0a, 0x0a, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x2e, 0x64, + 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6d, 0x6f, + 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, + 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x44, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x73, + 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1f, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4e, 0x6f, 0x64, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xb4, 0x01, 0x0a, 0x0c, 0x44, 0x4b, + 0x56, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, 0x47, 0x0a, 0x0c, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x2e, 0x64, 0x6b, 0x76, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, + 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x5b, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x23, 0x2e, 0x64, 0x6b, 0x76, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x64, 0x6b, 0x76, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x32, 0x51, 0x0a, 0x10, 0x44, 0x4b, 0x56, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, + 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x3d, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x64, 0x6b, 0x76, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x49, + 0x6e, 0x66, 0x6f, 0x42, 0x30, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x66, 0x6c, 0x69, 0x70, 0x6b, 0x61, 0x72, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x75, 0x62, + 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x64, 0x6b, 0x76, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/serverpb/admin.proto b/pkg/serverpb/admin.proto index 6d290c8e..e07eb8e8 100644 --- a/pkg/serverpb/admin.proto +++ b/pkg/serverpb/admin.proto @@ -189,6 +189,9 @@ message RegionInfo { // Nexus cluster url of the region // Will be used by new followers to discover the raft cluster optional string nexusClusterUrl = 7; + // http listener of the node. + // http endpoint is useful for admin API interactions + string httpAddress = 8; } enum RegionStatus {