diff --git a/gonsensus.go b/gonsensus.go index daf2d5a..66ccf7a 100644 --- a/gonsensus.go +++ b/gonsensus.go @@ -309,70 +309,24 @@ func (m *Manager) renewLock(ctx context.Context) error { } func (m *Manager) Run(ctx context.Context) error { - var isLeader bool + leaderState := &leaderState{ + manager: m, + isLeader: false, + } for { select { case <-ctx.Done(): - if isLeader && m.onDemoted != nil { - m.onDemoted(ctx) - } + leaderState.handleDemotion(ctx) return ctx.Err() - default: - if !isLeader { - err := m.acquireLock(ctx) - if err != nil { - if !errors.Is(err, ErrLockExists) { - log.Printf("Error acquiring lock: %v\n", err) - } - - time.Sleep(m.pollInterval) - - continue + if err := leaderState.runLeaderLoop(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + log.Printf("Leader loop error: %v\n", err) } - isLeader = true - - if m.onElected != nil { - if err := m.onElected(ctx); err != nil { - log.Printf("Error in leader callback: %v\n", err) - - isLeader = false - - continue - } - } - } - - // Leader maintenance - ticker := time.NewTicker(m.ttl / retryIntervalDivider) - - for isLeader { - select { - case <-ctx.Done(): - ticker.Stop() - - if m.onDemoted != nil { - m.onDemoted(ctx) - } - - return ctx.Err() - - case <-ticker.C: - if err := m.renewLock(ctx); err != nil { - log.Printf("Failed to renew lock: %v\n", err) - - isLeader = false - - ticker.Stop() - - if m.onDemoted != nil { - m.onDemoted(ctx) - } - } - } + return err } time.Sleep(m.pollInterval) @@ -421,3 +375,92 @@ func (l *LockInfo) IsExpired() bool { func (l *LockInfo) IsValid() bool { return l != nil && !l.IsExpired() } + +// leaderState is an internal state machine for leader election. +type leaderState struct { + manager *Manager + isLeader bool +} + +func (s *leaderState) runLeaderLoop(ctx context.Context) error { + if !s.isLeader { + if err := s.tryBecomeLeader(ctx); err != nil { + return err + } + } + + if s.isLeader { + return s.runLeaderMaintenance(ctx) + } + + return nil +} + +func (s *leaderState) tryBecomeLeader(ctx context.Context) error { + err := s.manager.acquireLock(ctx) + if err != nil { + if !errors.Is(err, ErrLockExists) { + log.Printf("Error acquiring lock: %v\n", err) + } + + return nil + } + + s.isLeader = true + + return s.handleElection(ctx) +} + +func (s *leaderState) handleElection(ctx context.Context) error { + if s.manager.onElected == nil { + return nil + } + + if err := s.manager.onElected(ctx); err != nil { + log.Printf("Error in leader callback: %v\n", err) + + s.isLeader = false + } + + return nil +} + +func (s *leaderState) handleDemotion(ctx context.Context) { + if s.isLeader && s.manager.onDemoted != nil { + s.manager.onDemoted(ctx) + } + + s.isLeader = false +} + +func (s *leaderState) runLeaderMaintenance(ctx context.Context) error { + ticker := time.NewTicker(s.manager.ttl / retryIntervalDivider) + defer ticker.Stop() + + for s.isLeader { + select { + case <-ctx.Done(): + s.handleDemotion(ctx) + + return ctx.Err() + + case <-ticker.C: + if err := s.renewLeadership(ctx); err != nil { + return nil + } + } + } + + return nil +} + +func (s *leaderState) renewLeadership(ctx context.Context) error { + if err := s.manager.renewLock(ctx); err != nil { + log.Printf("Failed to renew lock: %v\n", err) + s.handleDemotion(ctx) + + return err + } + + return nil +}