Skip to content

Commit

Permalink
Feat: support exit-on-lost-leader flag (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
chivalryq authored Aug 8, 2023
1 parent 8438aa7 commit 7937d09
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 4 deletions.
7 changes: 6 additions & 1 deletion pkg/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Config struct {
PluginConfig PluginConfig

DexServerURL string

// ExitOnLostLeader will exit the process if this server lost the leader election, set this to true for debugging
ExitOnLostLeader bool
}

// PluginConfig the plugin directory config
Expand Down Expand Up @@ -95,7 +98,8 @@ func NewConfig() *Config {
CorePluginPath: "core-plugins",
CustomPluginPath: []string{"plugins"},
},
DexServerURL: "http://dex.vela-system:5556",
DexServerURL: "http://dex.vela-system:5556",
ExitOnLostLeader: true,
}
}

Expand Down Expand Up @@ -127,5 +131,6 @@ func (s *Config) AddFlags(fs *pflag.FlagSet, c *Config) {
fs.StringVar(&s.WorkflowVersion, "workflow-version", c.WorkflowVersion, "the version of workflow to meet controller requirement.")
fs.StringVar(&s.DexServerURL, "dex-server", c.DexServerURL, "the URL of the dex server.")
fs.StringArrayVar(&s.PluginConfig.CustomPluginPath, "plugin-path", c.PluginConfig.CustomPluginPath, "the path of the plugin directory")
fs.BoolVar(&s.ExitOnLostLeader, "exit-on-lost-leader", c.ExitOnLostLeader, "exit the process if this server lost the leader election")
profiling.AddFlags(fs)
}
2 changes: 1 addition & 1 deletion pkg/server/domain/service/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (w *workflowServiceImpl) SyncWorkflowRecord(ctx context.Context, appPrimary
record.Finished = "true"
record.Status = model.RevisionStatusFail
if err := w.Store.Put(ctx, record); err != nil {
return fmt.Errorf(("failed to set the record status to terminated: %s"), err.Error())
return fmt.Errorf("failed to set the record status to terminated: %s", err.Error())
}
return bcode.ErrApplicationRevisionNotExist
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/server/event/sync/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ func (a *ApplicationSync) Start(ctx context.Context, errorChan chan error) {
}
app := item.(*v1beta1.Application)
if err := cu.AddOrUpdate(ctx, app); err != nil {
klog.Errorf("fail to add or update application %s: %s", app.Name, err.Error())
failTimes := a.Queue.NumRequeues(app)
klog.Errorf("fail to add or update application %s: %s, requeue times: %d", app.Name, err.Error(), failTimes)
if failTimes < 5 {
a.Queue.AddRateLimited(app)
} else {
klog.Errorf("fail to add or update application %s: %s, requeue times reach the limit(%d), give up", app.Name, err.Error(), failTimes)
a.Queue.Forget(app)
}
}
a.Queue.Done(app)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ func (s *restServer) setupLeaderElection(errChan chan error) (*leaderelection.Le
go event.StartEventWorker(ctx, errChan)
},
OnStoppedLeading: func() {
errChan <- fmt.Errorf("leader lost %s", s.cfg.LeaderConfig.ID)
if s.cfg.ExitOnLostLeader {
errChan <- fmt.Errorf("leader lost %s", s.cfg.LeaderConfig.ID)
}
},
OnNewLeader: func(identity string) {
if identity == s.cfg.LeaderConfig.ID {
Expand Down

0 comments on commit 7937d09

Please sign in to comment.