Skip to content

Commit

Permalink
Prometheus Metrics (#123)
Browse files Browse the repository at this point in the history
- Expose Prometheus metrics over http-listen-addr
- Prometheus cluster metrics aggregator
- Better replication lag metrics
- extras/prometheus-dashboard

      
Co-authored-by: Kinshuk <[email protected]>
Co-authored-by: Sangam Kumar Jain <[email protected]>
Co-authored-by: Shubham Aggarwal <[email protected]>
  • Loading branch information
shubhamaggarwal authored Apr 13, 2022
1 parent 2b1be08 commit 468cf90
Show file tree
Hide file tree
Showing 32 changed files with 2,545 additions and 140 deletions.
42 changes: 21 additions & 21 deletions clients/java/dkv-client/src/test/resources/Procfile
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
# run using `goreman -exit-on-error start`

dkv_discovery1:./bin/dkvsrv --role discovery --listen-addr 127.0.0.1:8001 --node-name d1 --nexus-node-url "http://127.0.0.1:7021" --nexus-cluster-url "http://127.0.0.1:7021,http://127.0.0.1:7022,http://127.0.0.1:7023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_discovery2:./bin/dkvsrv --role discovery --listen-addr 127.0.0.1:8002 --node-name d2 --nexus-node-url "http://127.0.0.1:7022" --nexus-cluster-url "http://127.0.0.1:7021,http://127.0.0.1:7022,http://127.0.0.1:7023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_discovery3:./bin/dkvsrv --role discovery --listen-addr 127.0.0.1:8003 --node-name d3 --nexus-node-url "http://127.0.0.1:7023" --nexus-cluster-url "http://127.0.0.1:7021,http://127.0.0.1:7022,http://127.0.0.1:7023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_discovery1:./bin/dkvsrv --role discovery --listen-addr 127.0.0.1:8001 --http-listen-addr 127.0.0.1:8005 --node-name d1 --nexus-node-url "http://127.0.0.1:7021" --nexus-cluster-url "http://127.0.0.1:7021,http://127.0.0.1:7022,http://127.0.0.1:7023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_discovery2:./bin/dkvsrv --role discovery --listen-addr 127.0.0.1:8002 --http-listen-addr 127.0.0.1:8006 --node-name d2 --nexus-node-url "http://127.0.0.1:7022" --nexus-cluster-url "http://127.0.0.1:7021,http://127.0.0.1:7022,http://127.0.0.1:7023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_discovery3:./bin/dkvsrv --role discovery --listen-addr 127.0.0.1:8003 --http-listen-addr 127.0.0.1:8007 --node-name d3 --nexus-node-url "http://127.0.0.1:7023" --nexus-cluster-url "http://127.0.0.1:7021,http://127.0.0.1:7022,http://127.0.0.1:7023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml

dkv_smaster1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:7080 --node-name m1 --dc-id dc1 --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_smaster2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:8080 --node-name m2 --dc-id dc1 --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_smaster3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:9080 --node-name m3 --dc-id dc1 --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_smaster1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:7080 --http-listen-addr 127.0.0.1:7081 --node-name m1 --dc-id dc1 --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_smaster2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:8080 --http-listen-addr 127.0.0.1:8081 --node-name m2 --dc-id dc1 --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_smaster3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:9080 --http-listen-addr 127.0.0.1:9081 --node-name m3 --dc-id dc1 --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml

dkv_slave1a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7091 --node-name s1a --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave1b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7092 --node-name s1b --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave1c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7093 --node-name s1c --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave1d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7094 --node-name s1d --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave1a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7091 --http-listen-addr 127.0.0.1:7095 --node-name s1a --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave1b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7092 --http-listen-addr 127.0.0.1:7096 --node-name s1b --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave1c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7093 --http-listen-addr 127.0.0.1:7097 --node-name s1c --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave1d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:7094 --http-listen-addr 127.0.0.1:7098 --node-name s1d --database s0 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml

dkv_slave2a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8091 --node-name s2a --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8092 --node-name s2b --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8093 --node-name s2c --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8094 --node-name s2d --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8091 --http-listen-addr 127.0.0.1:8095 --node-name s2a --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8092 --http-listen-addr 127.0.0.1:8096 --node-name s2b --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8093 --http-listen-addr 127.0.0.1:8097 --node-name s2c --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave2d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:8094 --http-listen-addr 127.0.0.1:8098 --node-name s2d --database s1 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml

dkv_slave3a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9091 --node-name s3a --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9092 --node-name s3b --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9093 --node-name s3c --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9094 --node-name s3d --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3a:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9091 --http-listen-addr 127.0.0.1:9095 --node-name s3a --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3b:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9092 --http-listen-addr 127.0.0.1:9096 --node-name s3b --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3c:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9093 --http-listen-addr 127.0.0.1:9097 --node-name s3c --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml
dkv_slave3d:./bin/dkvsrv --diskless --db-engine badger --role slave --listen-addr 127.0.0.1:9094 --http-listen-addr 127.0.0.1:9098 --node-name s3d --database s2 --config clients/java/dkv-client/src/test/resources/standalone_config.yaml

dkv_master1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6080 --node-name s0 --nexus-node-url "http://127.0.0.1:8021" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6081 --node-name s1 --nexus-node-url "http://127.0.0.1:8022" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6082 --node-name s2 --nexus-node-url "http://127.0.0.1:8023" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master1:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6080 --http-listen-addr 127.0.0.1:6085 --node-name s0 --nexus-node-url "http://127.0.0.1:8021" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master2:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6081 --http-listen-addr 127.0.0.1:6086 --node-name s1 --nexus-node-url "http://127.0.0.1:8022" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
dkv_master3:./bin/dkvsrv --role master --listen-addr 127.0.0.1:6082 --http-listen-addr 127.0.0.1:6087 --node-name s2 --nexus-node-url "http://127.0.0.1:8023" --nexus-cluster-url "http://127.0.0.1:8021,http://127.0.0.1:8022,http://127.0.0.1:8023" --config clients/java/dkv-client/src/test/resources/dkv_config.yaml
162 changes: 145 additions & 17 deletions cmd/dkvsrv/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand All @@ -12,11 +15,17 @@ import (
"strings"
"syscall"

"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/ini.v1"

"github.com/flipkart-incubator/dkv/internal/discovery"
"github.com/flipkart-incubator/dkv/internal/master"
"github.com/flipkart-incubator/dkv/internal/opts"
"github.com/flipkart-incubator/dkv/internal/slave"
"github.com/flipkart-incubator/dkv/internal/stats"
"github.com/flipkart-incubator/dkv/internal/stats/aggregate"
"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/flipkart-incubator/dkv/internal/storage/badger"
"github.com/flipkart-incubator/dkv/internal/storage/rocksdb"
Expand All @@ -32,9 +41,8 @@ import (
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"gopkg.in/ini.v1"

_ "net/http/pprof"
"net/http/pprof"
)

type dkvSrvrRole string
Expand Down Expand Up @@ -64,10 +72,15 @@ var (
verboseLogging bool
accessLogger *zap.Logger
dkvLogger *zap.Logger
pprofEnable bool

// Other vars
statsCli stats.Client
pprofEnable bool
statsCli stats.Client
promRegistry prometheus.Registerer
statsStreamer *stats.StatStreamer

discoveryClient discovery.Client
statAggregatorRegistry *aggregate.StatAggregatorRegistry
)

func init() {
Expand All @@ -92,13 +105,7 @@ func main() {
setupAccessLogger()
setFlagsForNexusDirs()
setupStats()

if pprofEnable {
go func() {
log.Printf("[INFO] Starting pprof on port 6060\n")
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
}()
}
go setupHttpServer()

kvs, cp, ca, br := newKVStore()
grpcSrvr, lstnr := newGrpcServerListener()
Expand All @@ -107,13 +114,21 @@ func main() {
//srvrRole.printFlags()

// Create the region info which is passed to DKVServer
nodeAddr, err := nodeAddress()
nodeAddr, err := nodeAddress(config.ListenAddr)
if err != nil {
log.Panicf("Failed to parse GRPC Listen Address %v.", err)
}

//HTTP listen Address
nodeHTTPAddr, err := nodeAddress(config.HttpListenAddr)
if err != nil {
log.Panicf("Failed to detect IP Address %v.", err)
log.Panicf("Failed to parse HTTP Listen Address %v.", err)
}

regionInfo := &serverpb.RegionInfo{
DcID: config.DcID,
NodeAddress: nodeAddr.Host,
HttpAddress: nodeHTTPAddr.Host,
Database: config.Database,
VBucket: config.VBucket,
Status: serverpb.RegionStatus_INACTIVE,
Expand All @@ -125,9 +140,9 @@ func main() {
Logger: dkvLogger,
HealthCheckTickerInterval: opts.DefaultHealthCheckTickterInterval, //to be exposed later via app.conf
StatsCli: statsCli,
PrometheusRegistry: promRegistry,
}

var discoveryClient discovery.Client
if srvrRole != noRole && srvrRole != discoveryRole {
var err error
discoveryClient, err = newDiscoveryClient()
Expand Down Expand Up @@ -327,6 +342,10 @@ func setupStats() {
} else {
statsCli = stats.NewNoOpClient()
}
promRegistry = stats.NewPromethousRegistry()
statsStreamer = stats.NewStatStreamer()
statAggregatorRegistry = aggregate.NewStatAggregatorRegistry()
go statsStreamer.Run()
}

func newKVStore() (storage.KVStore, storage.ChangePropagator, storage.ChangeApplier, storage.Backupable) {
Expand All @@ -353,7 +372,8 @@ func newKVStore() (storage.KVStore, storage.ChangePropagator, storage.ChangeAppl
rocksdb.WithCacheSize(config.BlockCacheSize),
rocksdb.WithRocksDBConfig(config.DbEngineIni),
rocksdb.WithLogger(dkvLogger),
rocksdb.WithStats(statsCli))
rocksdb.WithStats(statsCli),
rocksdb.WithPromStats(promRegistry))
if err != nil {
dkvLogger.Panic("RocksDB engine init failed", zap.Error(err))
}
Expand All @@ -368,6 +388,7 @@ func newKVStore() (storage.KVStore, storage.ChangePropagator, storage.ChangeAppl
badger.WithBadgerConfig(config.DbEngineIni),
badger.WithLogger(dkvLogger),
badger.WithStats(statsCli),
badger.WithPromStats(promRegistry),
}
if config.DisklessMode {
bdbOpts = append(bdbOpts, badger.WithInMemory())
Expand Down Expand Up @@ -452,8 +473,8 @@ func newDiscoveryClient() (discovery.Client, error) {

}

func nodeAddress() (*url.URL, error) {
ip, port, err := net.SplitHostPort(config.ListenAddr)
func nodeAddress(listenAddress string) (*url.URL, error) {
ip, port, err := net.SplitHostPort(listenAddress)
if err != nil {
return nil, err
}
Expand All @@ -477,3 +498,110 @@ func nodeAddress() (*url.URL, error) {
ep := url.URL{Host: fmt.Sprintf("%s:%s", ip, port)}
return &ep, nil
}

func setupHttpServer() {
router := mux.NewRouter()
router.Handle("/metrics", promhttp.Handler())
router.HandleFunc("/metrics/json", jsonMetricHandler)

router.HandleFunc("/metrics/stream", statsStreamHandler)
if toDKVSrvrRole(config.DbRole) == masterRole {
// Should be enabled only for discovery server ?
router.HandleFunc("/metrics/cluster", clusterMetricsHandler)
}

//Pprof
if pprofEnable {
log.Printf("[INFO] Enabling pprof...\n")
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

http.Handle("/", router)
http.ListenAndServe(config.HttpListenAddr, nil)
}

func jsonMetricHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
metrics, _ := stats.GetMetrics()
json.NewEncoder(w).Encode(metrics)
}

func statsStreamHandler(w http.ResponseWriter, r *http.Request) {
if f, ok := w.(http.Flusher); !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
} else {
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")

statChannel := make(chan stats.DKVMetrics, 5)
channelId := statsStreamer.Register(statChannel)
defer func() {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
}()
// Listen to the closing of the http connection via the CloseNotifier
log.Printf("[INFO] Starting Metrics Stream %v\n", channelId)
for {
select {
case stat := <-statChannel:
statJson, _ := json.Marshal(stat)
fmt.Fprintf(w, "data: %s\n\n", statJson)
f.Flush()
case <-r.Context().Done():
statsStreamer.DeRegister(channelId)
log.Printf("[INFO] Closing Metics Stream %v\n", channelId)
return
}
}
}
}

func clusterMetricsHandler(w http.ResponseWriter, r *http.Request) {
regions, err := discoveryClient.GetClusterStatus("", "")
if err != nil {
http.Error(w, "Unable to discover peers!", http.StatusInternalServerError)
return
}
if f, ok := w.(http.Flusher); !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
} else {
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", r.Header.Get("Origin"))
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")

statChannel := make(chan map[string]*stats.DKVMetrics, 5)
channelId := statAggregatorRegistry.Register(regions, func(region *serverpb.RegionInfo) string { return region.Database }, statChannel)
defer func() {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
}()

// Listen to the closing of the http connection via the CloseNotifier
log.Printf("[INFO] Starting ClusterMetics Stream %v\n", channelId)
for {
select {
case stat := <-statChannel:
statJson, _ := json.Marshal(stat)
fmt.Fprintf(w, "data: %s\n\n", statJson)
f.Flush()
case <-r.Context().Done():
log.Printf("[INFO] Closing ClusterMetics Stream %v\n", channelId)
statAggregatorRegistry.DeRegister(channelId)
return
}
}
}
}
15 changes: 8 additions & 7 deletions dkvsrv.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@

node-name : "" #Name of the current Node Name
listen-addr : "0.0.0.0:8080" #listen address
role : "none" #Role of the node - master|slave|standalone
pprof : false #Enable profiling
statsd-addr : "" #StatsdD Address
verbose : false # Enable verbose logging. By default, only warnings and errors are logged.
log-level : "warn" # Use exact log level.
node-name : "" #Name of the current Node Name
listen-addr : "0.0.0.0:8080" #listen address
http-listen-addr : "0.0.0.0:8081" # http listen address
role : "none" #Role of the node - master|slave|standalone
pprof : false #Enable profiling
statsd-addr : "" #StatsdD Address
verbose : false # Enable verbose logging. By default, only warnings and errors are logged.
log-level : "warn" # Use exact log level.

db-engine : "rocksdb" #Underlying DB engine for storing data - badger|rocksdb
db-engine-ini : "rocksdb.ini" #An .ini file for configuring the underlying storage engine. Refer badger.ini or rocks.ini for more details.
Expand Down
1 change: 1 addition & 0 deletions extras/prometheus-dashboard/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PROM_DIR=./prometheus
Loading

0 comments on commit 468cf90

Please sign in to comment.