Skip to content

Commit 462ef0c

Browse files
authored
chore: leader race fixes (#28)
1 parent 45dd190 commit 462ef0c

File tree

3 files changed

+21
-8
lines changed

3 files changed

+21
-8
lines changed

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@ all: fmt lint test
55
fmt:
66
go fmt $$(go list ./...)
77

8-
lint:
8+
lint: vet
99
golangci-lint run
1010

11+
lit: lint
12+
13+
vet:
14+
go vet $$(go list ./...)
15+
1116
test:
1217
go test -v -race -run ^Test -parallel=8 ./...
1318

leader.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,28 +45,35 @@ func (s *leaderState) tryBecomeLeader(ctx context.Context) error {
4545
}
4646

4747
func (s *leaderState) handleElection(ctx context.Context) error {
48-
if s.manager.onElected == nil {
48+
// Get callback reference under read lock
49+
s.mu.RLock()
50+
callback := s.manager.onElected
51+
s.mu.RUnlock()
52+
53+
if callback == nil {
4954
return nil
5055
}
5156

52-
if err := s.manager.onElected(ctx); err != nil {
57+
// Release lock during potentially long-running callback
58+
if err := callback(ctx); err != nil {
5359
log.Printf("Error in leader callback: %v\n", err)
54-
5560
s.setLeader(false)
5661
}
5762

5863
return nil
5964
}
6065

6166
func (s *leaderState) handleDemotion(ctx context.Context) {
62-
// Get current state before demotion
63-
wasLeader := s.getIsLeader()
67+
s.mu.Lock()
68+
defer s.mu.Unlock()
6469

65-
if wasLeader && s.manager.onDemoted != nil {
70+
// Need atomic operation because we need to check state and potentially
71+
// call callback within the same lock
72+
if s.isLeader && s.manager.onDemoted != nil {
6673
s.manager.onDemoted(ctx)
6774
}
6875

69-
s.setLeader(false)
76+
s.isLeader = false
7077
}
7178

7279
func (s *leaderState) runLeaderMaintenance(ctx context.Context) error {

leader_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ func testConcurrentMaintenanceAndDemotion(t *testing.T, state *leaderState) {
264264
// Start maintenance loop
265265
go func() {
266266
defer wGroup.Done()
267+
267268
state.setLeader(true)
268269
_ = state.runLeaderMaintenance(ctx)
269270
}()

0 commit comments

Comments
 (0)