Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Metrics + ADMIN UI #120

Closed
wants to merge 20 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixing merge commit
jainsangam committed Aug 2, 2021
commit 04db55a06e372e3059c8f2f2d7228a71d1f6a88e
7 changes: 2 additions & 5 deletions cmd/dkvsrv/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"github.com/flipkart-incubator/dkv/internal/discovery"
"gopkg.in/ini.v1"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io/ioutil"
@@ -323,11 +321,10 @@ func setupDKVLogger() {

func newGrpcServerListener() (*grpc.Server, net.Listener) {
grpcSrvr := grpc.NewServer(
grpc.ChainStreamInterceptor(grpc_prometheus.StreamServerInterceptor,grpc_zap.StreamServerInterceptor(accessLogger)),
grpc.ChainUnaryInterceptor(grpc_prometheus.UnaryServerInterceptor,grpc_zap.UnaryServerInterceptor(accessLogger)),
grpc.StreamInterceptor(grpc_zap.StreamServerInterceptor(accessLogger)),
grpc.UnaryInterceptor(grpc_zap.UnaryServerInterceptor(accessLogger)),
)
reflection.Register(grpcSrvr)
grpc_prometheus.Register(grpcSrvr)
return grpcSrvr, newListener()
}

5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -15,11 +15,14 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.2 // indirect
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kpango/fastime v1.0.16
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/procfs v0.0.10 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/smira/go-statsd v1.3.1
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -65,8 +65,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpm
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
github.com/flipkart-incubator/gorocksdb v0.0.0-20210507064827-a2162cb9a3f7 h1:PxlANvUXyHsBPYk3O8XGekF3WXnJmhYuXwcq6wv0atc=
github.com/flipkart-incubator/gorocksdb v0.0.0-20210507064827-a2162cb9a3f7/go.mod h1:kvJSXc90Ifw0rxuTxEHKq6UH/7hQ/gd9RKCyD94ctJ0=
github.com/flipkart-incubator/nexus v0.0.0-20210727093243-32ce0a3d3391 h1:kVUSmwuKO5/X7eQyH+5fmmrP/0ZjzEXHF0arQpnbYBE=
github.com/flipkart-incubator/nexus v0.0.0-20210727093243-32ce0a3d3391/go.mod h1:p8YOMx5k0TYnOxPB04t0lY6COMRAa7CdEUYo6QIm1y4=
github.com/flipkart-incubator/nexus v0.0.0-20210730150933-87264a049c68 h1:n+5pTq7wEPpkQJdSMDU+449+qmEIFXSD12Zzi9RucCU=
github.com/flipkart-incubator/nexus v0.0.0-20210730150933-87264a049c68/go.mod h1:p8YOMx5k0TYnOxPB04t0lY6COMRAa7CdEUYo6QIm1y4=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -133,8 +131,12 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/googleapis/google-cloud-go v0.26.0/go.mod h1:yJoOdPPE9UpqbamBhJvp7Ur6OUPPV4rUY3RnssPGNBA=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 h1:0IKlLyQ3Hs9nDaiK5cSHAGmcQEIC8l2Ts1u6x5Dfrqg=
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
35 changes: 9 additions & 26 deletions internal/slave/service.go
Original file line number Diff line number Diff line change
@@ -3,18 +3,14 @@ package slave
import (
"context"
"errors"
"github.com/prometheus/client_golang/prometheus"
"io"
"strings"
"time"

"fmt"
"github.com/flipkart-incubator/dkv/internal/discovery"
"github.com/flipkart-incubator/dkv/internal/hlc"
"github.com/flipkart-incubator/dkv/internal/stats"
"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/flipkart-incubator/dkv/pkg/ctl"
"github.com/flipkart-incubator/dkv/pkg/serverpb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/emptypb"
"io"
@@ -60,24 +56,23 @@ type replInfo struct {
replConfig *ReplicationConfig
fromChngNum uint64
}

type slaveService struct {
store storage.KVStore
ca storage.ChangeApplier
lg *zap.Logger
statsCli stats.Client
stat *stat
replCli *ctl.DKVClient
replTckr *time.Ticker
replStop chan struct{}
replLag uint64
fromChngNum uint64
maxNumChngs uint32
regionInfo *serverpb.RegionInfo
clusterInfo discovery.ClusterInfoGetter
isClosed bool
replInfo *replInfo
}

type stat struct {
ReplicationLag prometheus.Gauge
}


func newStat() *stat {
repliacationLag := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "slave",
@@ -90,18 +85,6 @@ func newStat() *stat {
}
}

// TODO: check if this needs to be exposed as a flag
const maxNumChangesRepl = 10000
type slaveService struct {
store storage.KVStore
ca storage.ChangeApplier
lg *zap.Logger
statsCli stats.Client
regionInfo *serverpb.RegionInfo
clusterInfo discovery.ClusterInfoGetter
isClosed bool
replInfo *replInfo
}

// NewService creates a slave DKVService that periodically polls
// for changes from master node and replicates them onto its local
@@ -118,7 +101,7 @@ func NewService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger
func newSlaveService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger,
statsCli stats.Client, info *serverpb.RegionInfo, replConf *ReplicationConfig, clusterInfo discovery.ClusterInfoGetter) *slaveService {
ri := &replInfo{replConfig: replConf}
ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli,
ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli,stat: newStat(),
regionInfo: info, replInfo: ri, clusterInfo: clusterInfo}
ss.findAndConnectToMaster()
ss.startReplication()
@@ -203,7 +186,7 @@ func (ss *slaveService) pollAndApplyChanges() {
case <-ss.replInfo.replTckr.C:
ss.lg.Info("Current replication lag", zap.Uint64("ReplicationLag", ss.replInfo.replLag))
ss.statsCli.Gauge("replication.lag", int64(ss.replInfo.replLag))
ss.stat.ReplicationLag.Set(float64(ss.replLag))
ss.stat.ReplicationLag.Set(float64(ss.replInfo.replLag))
if err := ss.applyChangesFromMaster(ss.replInfo.replConfig.MaxNumChngs); err != nil {
ss.lg.Error("Unable to retrieve changes from master", zap.Error(err))
if err := ss.replaceMasterIfInactive(); err != nil {