diff --git a/clients/java/dkv-client/src/test/resources/dkv_config.yaml b/clients/java/dkv-client/src/test/resources/dkv_config.yaml index 75fc7f92..2ab8be02 100644 --- a/clients/java/dkv-client/src/test/resources/dkv_config.yaml +++ b/clients/java/dkv-client/src/test/resources/dkv_config.yaml @@ -5,7 +5,6 @@ db-engine-ini : "" block-cache-size : 3221225472 root-folder : "/tmp/dkvsrv" dc-id : "default" -disable-auto-master-disc : false discovery-service-config : "internal/discovery/discovery.ini" diskless : false pprof : false diff --git a/clients/java/dkv-client/src/test/resources/standalone_config.yaml b/clients/java/dkv-client/src/test/resources/standalone_config.yaml index 9de44ab7..8e68712b 100644 --- a/clients/java/dkv-client/src/test/resources/standalone_config.yaml +++ b/clients/java/dkv-client/src/test/resources/standalone_config.yaml @@ -3,7 +3,6 @@ db-engine-ini : "" block-cache-size : 3221225472 root-folder : "/tmp/dkvsrv" dc-id : "dc1" -disable-auto-master-disc : false discovery-service-config : "internal/discovery/discovery.ini" diskless : false pprof : false diff --git a/cmd/dkvsrv/main.go b/cmd/dkvsrv/main.go index 39ea5912..9f9334aa 100644 --- a/cmd/dkvsrv/main.go +++ b/cmd/dkvsrv/main.go @@ -177,12 +177,11 @@ func main() { // TODO - construct replConfig from region level config described in LLD maxNumChanges := uint32(10000) replConfig := &slave.ReplicationConfig{ - MaxNumChngs: maxNumChanges, - ReplPollInterval: config.ReplPollInterval, - MaxActiveReplLag: uint64(maxNumChanges * 10), - MaxActiveReplElapsed: uint64(config.ReplPollInterval.Seconds()) * 10, - DisableAutoMasterDisc: config.DisableAutoMasterDisc, - ReplMasterAddr: config.ReplicationMasterAddr, + MaxNumChngs: maxNumChanges, + ReplPollInterval: config.ReplPollInterval, + MaxActiveReplLag: uint64(maxNumChanges * 10), + MaxActiveReplElapsed: uint64(config.ReplPollInterval.Seconds()) * 10, + ReplMasterAddr: config.ReplicationMasterAddr, } dkvSvc, _ := slave.NewService(kvs, ca, regionInfo, replConfig, discoveryClient, serveropts) defer dkvSvc.Close() diff --git a/dkvsrv.yaml b/dkvsrv.yaml index b1875891..852ff56d 100644 --- a/dkvsrv.yaml +++ b/dkvsrv.yaml @@ -19,7 +19,6 @@ vbucket : "default" # Database identifier database : "default" # vBucket identifier -disable-auto-master-disc : false discovery-service-config : "internal/discovery/discovery.ini" repl-master-addr : "" #Service address of DKV master node for replication repl-poll-interval : "5s" #Interval used for polling changes from master. Eg., 10s, 5ms, 2h, etc. diff --git a/internal/opts/config.go b/internal/opts/config.go index 45ccfd65..a54ddde4 100644 --- a/internal/opts/config.go +++ b/internal/opts/config.go @@ -44,7 +44,6 @@ type Config struct { // The above issue causes replication issues during master switch due to inconsistent change numbers // Thus enabling hardcoded masters to not degrade current behaviour ReplicationMasterAddr string `mapstructure:"repl-master-addr" desc:"Service address of DKV master node for replication"` - DisableAutoMasterDisc bool `mapstructure:"disable-auto-master-disc"` // Logging vars AccessLog string `mapstructure:"access-log" desc:"File for logging DKV accesses eg., stdout, stderr, /tmp/access.log"` @@ -110,8 +109,8 @@ func (c *Config) validateFlags() { } } - if c.DbRole == "slave" && c.DisableAutoMasterDisc { - if c.ReplicationMasterAddr == "" || strings.IndexRune(c.ReplicationMasterAddr, ':') < 0 { + if c.DbRole == "slave" { + if c.ReplicationMasterAddr != "" && strings.IndexRune(c.ReplicationMasterAddr, ':') < 0 { log.Panicf("given master address: %s for replication is invalid, must be in host:port format", c.ReplicationMasterAddr) } } diff --git a/internal/slave/service.go b/internal/slave/service.go index f00e5eca..4333fc71 100644 --- a/internal/slave/service.go +++ b/internal/slave/service.go @@ -41,9 +41,6 @@ type ReplicationConfig struct { MaxActiveReplElapsed uint64 // Listener address of the master node ReplMasterAddr string - // Temporary flag to disable automatic master discovery until https://github.com/flipkart-incubator/dkv/issues/82 is fixed - // The above issue causes replication issues during master switch due to inconsistent change numbers - DisableAutoMasterDisc bool } type replInfo struct { @@ -308,8 +305,8 @@ func (ss *slaveService) GetStatus(context context.Context, request *emptypb.Empt } func (ss *slaveService) replaceMasterIfInactive() error { - if ss.replInfo.replConfig.DisableAutoMasterDisc { - return nil + if ss.replInfo.replConfig.ReplMasterAddr != "" { + return ss.reconnectMaster() // reconnect to the existing master } if regions, err := ss.clusterInfo.GetClusterStatus(ss.regionInfo.GetDatabase(), ss.regionInfo.GetVBucket()); err == nil { var currentMaster *serverpb.RegionInfo = nil @@ -353,6 +350,7 @@ func (ss *slaveService) findAndConnectToMaster() error { ss.replInfo.replCli = replCli ss.replInfo.replConfig.ReplMasterAddr = *master ss.replInfo.replActive = true + ss.serveropts.Logger.Warn("Started replication client with master", zap.String("MasterIP", *master)) } else { ss.serveropts.Logger.Warn("Unable to create a replication client", zap.Error(err)) return err @@ -369,7 +367,7 @@ func (ss *slaveService) findAndConnectToMaster() error { // followed by followers outside DC, followed by master outside DC // TODO - rather than randomly selecting a master from applicable followers, load balance to distribute better func (ss *slaveService) findNewMaster() (*string, error) { - if ss.replInfo.replConfig.DisableAutoMasterDisc { + if ss.replInfo.replConfig.ReplMasterAddr != "" { return &ss.replInfo.replConfig.ReplMasterAddr, nil } // Get all active regions diff --git a/internal/slave/service_test.go b/internal/slave/service_test.go index c61e6985..5151b4d5 100644 --- a/internal/slave/service_test.go +++ b/internal/slave/service_test.go @@ -281,7 +281,7 @@ func startSlaveAndAttachToMaster(client *ctl.DKVClient) { wg := sync.WaitGroup{} wg.Add(1) rdbStore := newRocksDBStore(dbFolderSlave) - go serveStandaloneDKVSlave(&wg, rdbStore, rdbStore, client, false, discoveryCli) + go serveStandaloneDKVSlave(&wg, rdbStore, rdbStore, client, discoveryCli) wg.Wait() // stop the slave poller so as to avoid race with this poller @@ -458,7 +458,7 @@ func TestLargePayloadsDuringRepl(t *testing.T) { defer masterGrpcSrvr.GracefulStop() wg.Add(1) - go serveStandaloneDKVSlave(&wg, slaveRDB, slaveRDB, masterCli, false, testingClusterInfo{}) + go serveStandaloneDKVSlave(&wg, slaveRDB, slaveRDB, masterCli, testingClusterInfo{}) wg.Wait() // stop the slave poller so as to avoid race with this poller @@ -516,7 +516,7 @@ func initMasterAndSlaves(masterStore, slaveStore storage.KVStore, cp storage.Cha masterCli = newDKVClient(masterSvcPort) wg.Add(1) - go serveStandaloneDKVSlave(&wg, slaveStore, ca, masterCli, false, testingClusterInfo{}) + go serveStandaloneDKVSlave(&wg, slaveStore, ca, masterCli, testingClusterInfo{}) wg.Wait() // stop the slave poller so as to avoid race with this poller @@ -886,14 +886,13 @@ func serveStandaloneDKVMaster(wg *sync.WaitGroup, store storage.KVStore, cp stor masterGrpcSrvr.Serve(lis) } -func serveStandaloneDKVSlave(wg *sync.WaitGroup, store storage.KVStore, ca storage.ChangeApplier, masterCli *ctl.DKVClient, disableAutoMasterDisc bool, discoveryClient discovery.Client) { +func serveStandaloneDKVSlave(wg *sync.WaitGroup, store storage.KVStore, ca storage.ChangeApplier, masterCli *ctl.DKVClient, discoveryClient discovery.Client) { lgr, _ := zap.NewDevelopment() replConf := ReplicationConfig{ - MaxNumChngs: 2, - ReplPollInterval: 5 * time.Second, - MaxActiveReplLag: 10, - MaxActiveReplElapsed: 5, - DisableAutoMasterDisc: disableAutoMasterDisc, + MaxNumChngs: 2, + ReplPollInterval: 5 * time.Second, + MaxActiveReplLag: 10, + MaxActiveReplElapsed: 5, } specialOpts := &opts.ServerOpts{