Skip to content

Commit

Permalink
use the safer atomic wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 16, 2024
1 parent fa6a8e8 commit 838ff50
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions spindle.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ type Lock struct {
name string // lock name
id string // unique id for this instance
duration int64 // lock duration in ms
iter int64
iter atomic.Int64
token *time.Time
mtx *sync.Mutex
logger *log.Logger
active int32
active atomic.Int32
}

// Run starts the main lock loop which can be canceled using the input context. You can
// provide an optional done channel if you want to be notified when the loop is done.
func (l *Lock) Run(ctx context.Context, done ...chan error) error {
var leader int32 // for heartbeat
var leader atomic.Int32 // for heartbeat
go func() {
min := (time.Millisecond * time.Duration(l.duration)) / 2
bo := gaxv2.Backoff{Max: time.Millisecond * time.Duration(l.duration)}
Expand All @@ -71,7 +71,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
}

switch {
case atomic.LoadInt32(&leader) > 0:
case leader.Load() > 0:
var tm time.Duration
for {
tm = bo.Pause()
Expand All @@ -97,8 +97,8 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
}

if l.tokenString() != "" && l.tokenString() == tokenLocked {
atomic.AddInt32(&leader, 1)
if atomic.LoadInt32(&leader) == 1 {
leader.Add(1)
if leader.Load() == 1 {
l.heartbeat() // only on 1
}

Expand All @@ -120,33 +120,34 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {

if ok {
l.logger.Println("leader active (not me)")
atomic.StoreInt32(&leader, 0) // reset heartbeat
leader.Store(0) // reset heartbeat
return true
}
}

return false // lock available
}

var initial int32 = 1
var active int32
var initial atomic.Int32
var active atomic.Int32
initial.Store(1)

attemptLeader := func() {
defer func(begin time.Time) {
l.logger.Printf("round %v took %v", l.Iterations(), time.Since(begin))
atomic.StoreInt32(&l.active, 1) // global
atomic.StoreInt32(&active, 0) // local
atomic.AddInt64(&l.iter, 1)
l.active.Store(1) // global
active.Store(0) // local
l.iter.Add(1)
}(time.Now())

atomic.StoreInt32(&active, 1) // local
active.Store(1) // local
if locked() {
return
}

// Attempt first ever lock. The return commit timestamp will be our fencing token.
// Only one node should be able to do this successfully.
if atomic.LoadInt32(&initial) == 1 {
// Attempt first ever lock. The return commit timestamp will be our fencing
// token. Only one node should be able to do this successfully.
if initial.Load() == 1 {
prefix := "[init]"
cts, err := l.db.ReadWriteTransaction(context.Background(),
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
Expand All @@ -169,11 +170,11 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
return
}

atomic.StoreInt32(&initial, 0)
initial.Store(0)
}

// For the succeeding lock attempts.
if atomic.LoadInt32(&initial) == 0 {
if initial.Load() == 0 {
prefix := "[next]"
token, _, err := l.getCurrentTokenAndId()
if err != nil {
Expand All @@ -196,7 +197,8 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
fmt.Fprintf(&q, "values ('%s')", nxt)
_, err := txn.Update(ctx, spanner.Statement{SQL: q.String()})
return err
})
},
)

if err == nil {
// We got the lock. Attempt to update the current token to this commit timestamp.
Expand Down Expand Up @@ -244,7 +246,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
case <-first: // immediately before 1st tick
go attemptLeader()
case <-ticker.C: // duration heartbeat
if atomic.LoadInt32(&active) == 1 {
if active.Load() == 1 {
continue
}

Expand All @@ -254,7 +256,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
done[0] <- nil
}

atomic.StoreInt32(&l.active, 0) // global
l.active.Store(0) // global
ticker.Stop()
return
}
Expand All @@ -266,7 +268,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {

// HasLock returns true if this instance got the lock, together with the lock token.
func (l *Lock) HasLock() (bool, string) {
if atomic.LoadInt32(&l.active) == 0 {
if l.active.Load() == 0 {
return false, ""
}

Expand All @@ -284,7 +286,7 @@ func (l *Lock) HasLock() (bool, string) {

// Leader returns the current leader id.
func (l *Lock) Leader() (string, error) {
if atomic.LoadInt32(&l.active) == 0 {
if l.active.Load() == 0 {
return "", ErrNotRunning
}

Expand All @@ -296,7 +298,7 @@ func (l *Lock) Leader() (string, error) {
func (l *Lock) Duration() int64 { return l.duration }

// Iterations returns the number of iterations done by the main loop.
func (l *Lock) Iterations() int64 { return atomic.LoadInt64(&l.iter) }
func (l *Lock) Iterations() int64 { return l.iter.Load() }

// Client returns the Spanner client.
func (l *Lock) Client() *spanner.Client { return l.db }
Expand Down

0 comments on commit 838ff50

Please sign in to comment.