Skip to content

Commit

Permalink
v2: use uint64 for token instead of timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 16, 2024
1 parent 838ff50 commit 9fe5bca
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
cloud.google.com/go/spanner v1.64.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.5
google.golang.org/api v0.187.0
Expand All @@ -19,7 +20,6 @@ require (
cloud.google.com/go/longrunning v0.5.9 // indirect
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
Expand Down
61 changes: 33 additions & 28 deletions spindle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"cloud.google.com/go/spanner"
"github.com/cespare/xxhash/v2"
"github.com/google/uuid"
gaxv2 "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -52,7 +53,7 @@ type Lock struct {
id string // unique id for this instance
duration int64 // lock duration in ms
iter atomic.Int64
token *time.Time
ttoken *time.Time
mtx *sync.Mutex
logger *log.Logger
active atomic.Int32
Expand All @@ -64,7 +65,10 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
var leader atomic.Int32 // for heartbeat
go func() {
min := (time.Millisecond * time.Duration(l.duration)) / 2
bo := gaxv2.Backoff{Max: time.Millisecond * time.Duration(l.duration)}
bo := gaxv2.Backoff{
Max: time.Millisecond * time.Duration(l.duration),
}

for {
if ctx.Err() == context.Canceled {
return // not foolproof due to delay
Expand All @@ -89,14 +93,14 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
}()

locked := func() bool {
// See if there is an active leased lock (could be us, could be somebody else).
tokenLocked, diff, err := l.checkLock()
// See if there is an active leased lock (could be us, could be someone else).
token, diff, err := l.checkLock()
if err != nil {
l.logger.Println(err)
return true // err on safer side
}

if l.tokenString() != "" && l.tokenString() == tokenLocked {
if l.token() == token {
leader.Add(1)
if leader.Load() == 1 {
l.heartbeat() // only on 1
Expand Down Expand Up @@ -166,7 +170,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {

if err == nil {
l.setToken(&cts)
l.logger.Printf("%v got the lock with token [%v]", prefix, l.tokenString())
l.logger.Printf("%v got the lock with token [%v]", prefix, l.token())
return
}

Expand All @@ -176,13 +180,13 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
// For the succeeding lock attempts.
if initial.Load() == 0 {
prefix := "[next]"
token, _, err := l.getCurrentTokenAndId()
token, _, err := l.getCurrentToken()
if err != nil {
l.logger.Printf("%v getCurrentTokenAndId failed: %v", prefix, err)
l.logger.Printf("%v getCurrentToken failed: %v", prefix, err)
return
}

if token == "" {
if token == 0 {
l.logger.Printf("%v read token failed: empty", prefix)
return
}
Expand Down Expand Up @@ -230,7 +234,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
}

l.setToken(&nts) // doesn't mean we're leader
l.logger.Printf("%v got the lock: token=%v", prefix, l.tokenString())
l.logger.Printf("%v got the lock: token=%v", prefix, l.token())
}
}
}
Expand Down Expand Up @@ -267,17 +271,17 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error {
}

// HasLock returns true if this instance got the lock, together with the lock token.
func (l *Lock) HasLock() (bool, string) {
func (l *Lock) HasLock() (bool, uint64) {
if l.active.Load() == 0 {
return false, ""
return false, 0
}

token, _, err := l.getCurrentTokenAndId()
token, _, err := l.getCurrentToken()
if err != nil {
return false, token
}

if token != "" && token == l.tokenString() {
if token == l.token() {
return true, token
}

Expand All @@ -290,7 +294,7 @@ func (l *Lock) Leader() (string, error) {
return "", ErrNotRunning
}

_, w, err := l.getCurrentTokenAndId()
_, w, err := l.getCurrentToken()
return w, err
}

Expand All @@ -303,29 +307,30 @@ func (l *Lock) Iterations() int64 { return l.iter.Load() }
// Client returns the Spanner client.
func (l *Lock) Client() *spanner.Client { return l.db }

func (l *Lock) tokenString() string {
func (l *Lock) token() uint64 {
l.mtx.Lock()
defer l.mtx.Unlock()
if l.token == nil {
return ""
if l.ttoken == nil {
return 0
}

return (*l.token).UTC().Format(time.RFC3339Nano)
v := (*l.ttoken).UTC().Format(time.RFC3339Nano)
return xxhash.Sum64String(v)
}

func (l *Lock) setToken(v *time.Time) {
l.mtx.Lock()
defer l.mtx.Unlock()
l.token = v
l.ttoken = v
}

type diffT struct {
Diff spanner.NullInt64
Token spanner.NullTime
}

func (l *Lock) checkLock() (string, int64, error) {
var tokenLocked string
func (l *Lock) checkLock() (uint64, int64, error) {
var token string
var diff int64

err := func() error {
Expand Down Expand Up @@ -361,21 +366,21 @@ func (l *Lock) checkLock() (string, int64, error) {
}

diff = v.Diff.Int64
tokenLocked = v.Token.Time.UTC().Format(time.RFC3339Nano)
token = v.Token.Time.UTC().Format(time.RFC3339Nano)
}

return retErr
}()

return tokenLocked, diff, err
return xxhash.Sum64String(token), diff, err
}

type tokenT struct {
Token spanner.NullTime
Writer spanner.NullString
}

func (l *Lock) getCurrentTokenAndId() (string, string, error) {
func (l *Lock) getCurrentToken() (uint64, string, error) {
var q strings.Builder
fmt.Fprintf(&q, "select token, writer from %s ", l.table)
fmt.Fprintf(&q, "where name = @name")
Expand All @@ -394,13 +399,13 @@ func (l *Lock) getCurrentTokenAndId() (string, string, error) {
}

if err != nil {
return token, writer, err
return 0, writer, err
}

var v tokenT
err = row.ToStruct(&v)
if err != nil {
return token, writer, err
return 0, writer, err
}

token = v.Token.Time.UTC().Format(time.RFC3339Nano)
Expand All @@ -409,7 +414,7 @@ func (l *Lock) getCurrentTokenAndId() (string, string, error) {
}
}

return token, writer, nil
return xxhash.Sum64String(token), writer, nil
}

func (l *Lock) heartbeat() {
Expand Down

0 comments on commit 9fe5bca

Please sign in to comment.