Skip to content

Commit

Permalink
Simplify AutoMasterDisc Flag (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
kingster authored Apr 13, 2022
1 parent b84c1f2 commit 2b1be08
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 27 deletions.
1 change: 0 additions & 1 deletion clients/java/dkv-client/src/test/resources/dkv_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ db-engine-ini : ""
block-cache-size : 3221225472
root-folder : "/tmp/dkvsrv"
dc-id : "default"
disable-auto-master-disc : false
discovery-service-config : "internal/discovery/discovery.ini"
diskless : false
pprof : false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ db-engine-ini : ""
block-cache-size : 3221225472
root-folder : "/tmp/dkvsrv"
dc-id : "dc1"
disable-auto-master-disc : false
discovery-service-config : "internal/discovery/discovery.ini"
diskless : false
pprof : false
Expand Down
11 changes: 5 additions & 6 deletions cmd/dkvsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,11 @@ func main() {
// TODO - construct replConfig from region level config described in LLD
maxNumChanges := uint32(10000)
replConfig := &slave.ReplicationConfig{
MaxNumChngs: maxNumChanges,
ReplPollInterval: config.ReplPollInterval,
MaxActiveReplLag: uint64(maxNumChanges * 10),
MaxActiveReplElapsed: uint64(config.ReplPollInterval.Seconds()) * 10,
DisableAutoMasterDisc: config.DisableAutoMasterDisc,
ReplMasterAddr: config.ReplicationMasterAddr,
MaxNumChngs: maxNumChanges,
ReplPollInterval: config.ReplPollInterval,
MaxActiveReplLag: uint64(maxNumChanges * 10),
MaxActiveReplElapsed: uint64(config.ReplPollInterval.Seconds()) * 10,
ReplMasterAddr: config.ReplicationMasterAddr,
}
dkvSvc, _ := slave.NewService(kvs, ca, regionInfo, replConfig, discoveryClient, serveropts)
defer dkvSvc.Close()
Expand Down
1 change: 0 additions & 1 deletion dkvsrv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ vbucket : "default" # Database identifier
database : "default" # vBucket identifier


disable-auto-master-disc : false
discovery-service-config : "internal/discovery/discovery.ini"
repl-master-addr : "" #Service address of DKV master node for replication
repl-poll-interval : "5s" #Interval used for polling changes from master. Eg., 10s, 5ms, 2h, etc.
Expand Down
5 changes: 2 additions & 3 deletions internal/opts/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Config struct {
// The above issue causes replication issues during master switch due to inconsistent change numbers
// Thus enabling hardcoded masters to not degrade current behaviour
ReplicationMasterAddr string `mapstructure:"repl-master-addr" desc:"Service address of DKV master node for replication"`
DisableAutoMasterDisc bool `mapstructure:"disable-auto-master-disc"`

// Logging vars
AccessLog string `mapstructure:"access-log" desc:"File for logging DKV accesses eg., stdout, stderr, /tmp/access.log"`
Expand Down Expand Up @@ -110,8 +109,8 @@ func (c *Config) validateFlags() {
}
}

if c.DbRole == "slave" && c.DisableAutoMasterDisc {
if c.ReplicationMasterAddr == "" || strings.IndexRune(c.ReplicationMasterAddr, ':') < 0 {
if c.DbRole == "slave" {
if c.ReplicationMasterAddr != "" && strings.IndexRune(c.ReplicationMasterAddr, ':') < 0 {
log.Panicf("given master address: %s for replication is invalid, must be in host:port format", c.ReplicationMasterAddr)
}
}
Expand Down
10 changes: 4 additions & 6 deletions internal/slave/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ type ReplicationConfig struct {
MaxActiveReplElapsed uint64
// Listener address of the master node
ReplMasterAddr string
// Temporary flag to disable automatic master discovery until https://github.com/flipkart-incubator/dkv/issues/82 is fixed
// The above issue causes replication issues during master switch due to inconsistent change numbers
DisableAutoMasterDisc bool
}

type replInfo struct {
Expand Down Expand Up @@ -308,8 +305,8 @@ func (ss *slaveService) GetStatus(context context.Context, request *emptypb.Empt
}

func (ss *slaveService) replaceMasterIfInactive() error {
if ss.replInfo.replConfig.DisableAutoMasterDisc {
return nil
if ss.replInfo.replConfig.ReplMasterAddr != "" {
return ss.reconnectMaster() // reconnect to the existing master
}
if regions, err := ss.clusterInfo.GetClusterStatus(ss.regionInfo.GetDatabase(), ss.regionInfo.GetVBucket()); err == nil {
var currentMaster *serverpb.RegionInfo = nil
Expand Down Expand Up @@ -353,6 +350,7 @@ func (ss *slaveService) findAndConnectToMaster() error {
ss.replInfo.replCli = replCli
ss.replInfo.replConfig.ReplMasterAddr = *master
ss.replInfo.replActive = true
ss.serveropts.Logger.Warn("Started replication client with master", zap.String("MasterIP", *master))
} else {
ss.serveropts.Logger.Warn("Unable to create a replication client", zap.Error(err))
return err
Expand All @@ -369,7 +367,7 @@ func (ss *slaveService) findAndConnectToMaster() error {
// followed by followers outside DC, followed by master outside DC
// TODO - rather than randomly selecting a master from applicable followers, load balance to distribute better
func (ss *slaveService) findNewMaster() (*string, error) {
if ss.replInfo.replConfig.DisableAutoMasterDisc {
if ss.replInfo.replConfig.ReplMasterAddr != "" {
return &ss.replInfo.replConfig.ReplMasterAddr, nil
}
// Get all active regions
Expand Down
17 changes: 8 additions & 9 deletions internal/slave/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func startSlaveAndAttachToMaster(client *ctl.DKVClient) {
wg := sync.WaitGroup{}
wg.Add(1)
rdbStore := newRocksDBStore(dbFolderSlave)
go serveStandaloneDKVSlave(&wg, rdbStore, rdbStore, client, false, discoveryCli)
go serveStandaloneDKVSlave(&wg, rdbStore, rdbStore, client, discoveryCli)
wg.Wait()

// stop the slave poller so as to avoid race with this poller
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestLargePayloadsDuringRepl(t *testing.T) {
defer masterGrpcSrvr.GracefulStop()

wg.Add(1)
go serveStandaloneDKVSlave(&wg, slaveRDB, slaveRDB, masterCli, false, testingClusterInfo{})
go serveStandaloneDKVSlave(&wg, slaveRDB, slaveRDB, masterCli, testingClusterInfo{})
wg.Wait()

// stop the slave poller so as to avoid race with this poller
Expand Down Expand Up @@ -516,7 +516,7 @@ func initMasterAndSlaves(masterStore, slaveStore storage.KVStore, cp storage.Cha
masterCli = newDKVClient(masterSvcPort)

wg.Add(1)
go serveStandaloneDKVSlave(&wg, slaveStore, ca, masterCli, false, testingClusterInfo{})
go serveStandaloneDKVSlave(&wg, slaveStore, ca, masterCli, testingClusterInfo{})
wg.Wait()

// stop the slave poller so as to avoid race with this poller
Expand Down Expand Up @@ -886,14 +886,13 @@ func serveStandaloneDKVMaster(wg *sync.WaitGroup, store storage.KVStore, cp stor
masterGrpcSrvr.Serve(lis)
}

func serveStandaloneDKVSlave(wg *sync.WaitGroup, store storage.KVStore, ca storage.ChangeApplier, masterCli *ctl.DKVClient, disableAutoMasterDisc bool, discoveryClient discovery.Client) {
func serveStandaloneDKVSlave(wg *sync.WaitGroup, store storage.KVStore, ca storage.ChangeApplier, masterCli *ctl.DKVClient, discoveryClient discovery.Client) {
lgr, _ := zap.NewDevelopment()
replConf := ReplicationConfig{
MaxNumChngs: 2,
ReplPollInterval: 5 * time.Second,
MaxActiveReplLag: 10,
MaxActiveReplElapsed: 5,
DisableAutoMasterDisc: disableAutoMasterDisc,
MaxNumChngs: 2,
ReplPollInterval: 5 * time.Second,
MaxActiveReplLag: 10,
MaxActiveReplElapsed: 5,
}

specialOpts := &opts.ServerOpts{
Expand Down

0 comments on commit 2b1be08

Please sign in to comment.