Skip to content

Commit

Permalink
refactor run loop (#1)
Browse files Browse the repository at this point in the history
* refactor run loop

* lint
  • Loading branch information
thevilledev authored Oct 27, 2024
1 parent a601d76 commit 70e5846
Showing 1 changed file with 98 additions and 55 deletions.
153 changes: 98 additions & 55 deletions gonsensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,70 +309,24 @@ func (m *Manager) renewLock(ctx context.Context) error {
}

func (m *Manager) Run(ctx context.Context) error {
var isLeader bool
leaderState := &leaderState{
manager: m,
isLeader: false,
}

for {
select {
case <-ctx.Done():
if isLeader && m.onDemoted != nil {
m.onDemoted(ctx)
}
leaderState.handleDemotion(ctx)

return ctx.Err()

default:
if !isLeader {
err := m.acquireLock(ctx)
if err != nil {
if !errors.Is(err, ErrLockExists) {
log.Printf("Error acquiring lock: %v\n", err)
}

time.Sleep(m.pollInterval)

continue
if err := leaderState.runLeaderLoop(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
log.Printf("Leader loop error: %v\n", err)
}

isLeader = true

if m.onElected != nil {
if err := m.onElected(ctx); err != nil {
log.Printf("Error in leader callback: %v\n", err)

isLeader = false

continue
}
}
}

// Leader maintenance
ticker := time.NewTicker(m.ttl / retryIntervalDivider)

for isLeader {
select {
case <-ctx.Done():
ticker.Stop()

if m.onDemoted != nil {
m.onDemoted(ctx)
}

return ctx.Err()

case <-ticker.C:
if err := m.renewLock(ctx); err != nil {
log.Printf("Failed to renew lock: %v\n", err)

isLeader = false

ticker.Stop()

if m.onDemoted != nil {
m.onDemoted(ctx)
}
}
}
return err
}

time.Sleep(m.pollInterval)
Expand Down Expand Up @@ -421,3 +375,92 @@ func (l *LockInfo) IsExpired() bool {
func (l *LockInfo) IsValid() bool {
return l != nil && !l.IsExpired()
}

// leaderState is an internal state machine for leader election.
type leaderState struct {
manager *Manager
isLeader bool
}

func (s *leaderState) runLeaderLoop(ctx context.Context) error {
if !s.isLeader {
if err := s.tryBecomeLeader(ctx); err != nil {
return err
}
}

if s.isLeader {
return s.runLeaderMaintenance(ctx)
}

return nil
}

func (s *leaderState) tryBecomeLeader(ctx context.Context) error {
err := s.manager.acquireLock(ctx)
if err != nil {
if !errors.Is(err, ErrLockExists) {
log.Printf("Error acquiring lock: %v\n", err)
}

return nil
}

s.isLeader = true

return s.handleElection(ctx)
}

func (s *leaderState) handleElection(ctx context.Context) error {
if s.manager.onElected == nil {
return nil
}

if err := s.manager.onElected(ctx); err != nil {
log.Printf("Error in leader callback: %v\n", err)

s.isLeader = false
}

return nil
}

func (s *leaderState) handleDemotion(ctx context.Context) {
if s.isLeader && s.manager.onDemoted != nil {
s.manager.onDemoted(ctx)
}

s.isLeader = false
}

func (s *leaderState) runLeaderMaintenance(ctx context.Context) error {
ticker := time.NewTicker(s.manager.ttl / retryIntervalDivider)
defer ticker.Stop()

for s.isLeader {
select {
case <-ctx.Done():
s.handleDemotion(ctx)

return ctx.Err()

case <-ticker.C:
if err := s.renewLeadership(ctx); err != nil {
return nil
}
}
}

return nil
}

func (s *leaderState) renewLeadership(ctx context.Context) error {
if err := s.manager.renewLock(ctx); err != nil {
log.Printf("Failed to renew lock: %v\n", err)
s.handleDemotion(ctx)

return err
}

return nil
}

0 comments on commit 70e5846

Please sign in to comment.