Skip to content

Commit

Permalink
fixing merge commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jainsangam committed Aug 2, 2021
1 parent 8bb15c7 commit 7f36642
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 33 deletions.
6 changes: 2 additions & 4 deletions cmd/dkvsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/flipkart-incubator/dkv/internal/discovery"
"gopkg.in/ini.v1"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io/ioutil"
Expand Down Expand Up @@ -323,11 +322,10 @@ func setupDKVLogger() {

func newGrpcServerListener() (*grpc.Server, net.Listener) {
grpcSrvr := grpc.NewServer(
grpc.ChainStreamInterceptor(grpc_prometheus.StreamServerInterceptor,grpc_zap.StreamServerInterceptor(accessLogger)),
grpc.ChainUnaryInterceptor(grpc_prometheus.UnaryServerInterceptor,grpc_zap.UnaryServerInterceptor(accessLogger)),
grpc.StreamInterceptor(grpc_zap.StreamServerInterceptor(accessLogger)),
grpc.UnaryInterceptor(grpc_zap.UnaryServerInterceptor(accessLogger)),
)
reflection.Register(grpcSrvr)
grpc_prometheus.Register(grpcSrvr)
return grpcSrvr, newListener()
}

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.2 // indirect
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kpango/fastime v1.0.16
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/procfs v0.0.10 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/smira/go-statsd v1.3.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpm
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
github.com/flipkart-incubator/gorocksdb v0.0.0-20210507064827-a2162cb9a3f7 h1:PxlANvUXyHsBPYk3O8XGekF3WXnJmhYuXwcq6wv0atc=
github.com/flipkart-incubator/gorocksdb v0.0.0-20210507064827-a2162cb9a3f7/go.mod h1:kvJSXc90Ifw0rxuTxEHKq6UH/7hQ/gd9RKCyD94ctJ0=
github.com/flipkart-incubator/nexus v0.0.0-20210727093243-32ce0a3d3391 h1:kVUSmwuKO5/X7eQyH+5fmmrP/0ZjzEXHF0arQpnbYBE=
github.com/flipkart-incubator/nexus v0.0.0-20210727093243-32ce0a3d3391/go.mod h1:p8YOMx5k0TYnOxPB04t0lY6COMRAa7CdEUYo6QIm1y4=
github.com/flipkart-incubator/nexus v0.0.0-20210730150933-87264a049c68 h1:n+5pTq7wEPpkQJdSMDU+449+qmEIFXSD12Zzi9RucCU=
github.com/flipkart-incubator/nexus v0.0.0-20210730150933-87264a049c68/go.mod h1:p8YOMx5k0TYnOxPB04t0lY6COMRAa7CdEUYo6QIm1y4=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down Expand Up @@ -133,8 +131,12 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/googleapis/google-cloud-go v0.26.0/go.mod h1:yJoOdPPE9UpqbamBhJvp7Ur6OUPPV4rUY3RnssPGNBA=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 h1:0IKlLyQ3Hs9nDaiK5cSHAGmcQEIC8l2Ts1u6x5Dfrqg=
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
35 changes: 9 additions & 26 deletions internal/slave/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@ package slave
import (
"context"
"errors"
"github.com/prometheus/client_golang/prometheus"
"io"
"strings"
"time"

"fmt"
"github.com/flipkart-incubator/dkv/internal/discovery"
"github.com/flipkart-incubator/dkv/internal/hlc"
"github.com/flipkart-incubator/dkv/internal/stats"
"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/flipkart-incubator/dkv/pkg/ctl"
"github.com/flipkart-incubator/dkv/pkg/serverpb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/emptypb"
"io"
Expand Down Expand Up @@ -60,24 +56,23 @@ type replInfo struct {
replConfig *ReplicationConfig
fromChngNum uint64
}

type slaveService struct {
store storage.KVStore
ca storage.ChangeApplier
lg *zap.Logger
statsCli stats.Client
stat *stat
replCli *ctl.DKVClient
replTckr *time.Ticker
replStop chan struct{}
replLag uint64
fromChngNum uint64
maxNumChngs uint32
regionInfo *serverpb.RegionInfo
clusterInfo discovery.ClusterInfoGetter
isClosed bool
replInfo *replInfo
}

type stat struct {
ReplicationLag prometheus.Gauge
}


func newStat() *stat {
repliacationLag := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "slave",
Expand All @@ -90,18 +85,6 @@ func newStat() *stat {
}
}

// TODO: check if this needs to be exposed as a flag
const maxNumChangesRepl = 10000
type slaveService struct {
store storage.KVStore
ca storage.ChangeApplier
lg *zap.Logger
statsCli stats.Client
regionInfo *serverpb.RegionInfo
clusterInfo discovery.ClusterInfoGetter
isClosed bool
replInfo *replInfo
}

// NewService creates a slave DKVService that periodically polls
// for changes from master node and replicates them onto its local
Expand All @@ -118,7 +101,7 @@ func NewService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger
func newSlaveService(store storage.KVStore, ca storage.ChangeApplier, lgr *zap.Logger,
statsCli stats.Client, info *serverpb.RegionInfo, replConf *ReplicationConfig, clusterInfo discovery.ClusterInfoGetter) *slaveService {
ri := &replInfo{replConfig: replConf}
ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli,
ss := &slaveService{store: store, ca: ca, lg: lgr, statsCli: statsCli,stat: newStat(),
regionInfo: info, replInfo: ri, clusterInfo: clusterInfo}
ss.findAndConnectToMaster()
ss.startReplication()
Expand Down Expand Up @@ -203,7 +186,7 @@ func (ss *slaveService) pollAndApplyChanges() {
case <-ss.replInfo.replTckr.C:
ss.lg.Info("Current replication lag", zap.Uint64("ReplicationLag", ss.replInfo.replLag))
ss.statsCli.Gauge("replication.lag", int64(ss.replInfo.replLag))
ss.stat.ReplicationLag.Set(float64(ss.replLag))
ss.stat.ReplicationLag.Set(float64(ss.replInfo.replLag))
if err := ss.applyChangesFromMaster(ss.replInfo.replConfig.MaxNumChngs); err != nil {
ss.lg.Error("Unable to retrieve changes from master", zap.Error(err))
if err := ss.replaceMasterIfInactive(); err != nil {
Expand Down

0 comments on commit 7f36642

Please sign in to comment.