A Valkey Distributed Lock Pattern enhanced by Client Side Caching.
package main
import (
"context"
"github.com/valkey-io/valkey-go"
"github.com/valkey-io/valkey-go/valkeylock"
)
func main() {
locker, err := valkeylock.NewLocker(valkeylock.LockerOption{
ClientOption: valkey.ClientOption{InitAddress: []string{"localhost:6379"}},
KeyMajority: 1, // Use KeyMajority=1 if you have only one Valkey instance. Also make sure that all your `Locker`s share the same KeyMajority.
NoLoopTracking: true, // Enable this to have better performance if all your Valkey are >= 7.0.5.
})
if err != nil {
panic(err)
}
defer locker.Close()
// acquire the lock "my_lock"
ctx, cancel, err := locker.WithContext(context.Background(), "my_lock")
if err != nil {
panic(err)
}
// "my_lock" is acquired. use the ctx as normal.
doSomething(ctx)
// invoke cancel() to release the lock.
cancel()
}
- The returned
ctx
will be canceled automatically and immediately once theKeyMajority
is not held anymore, for example:- Valkey are down.
- Acquired keys has been deleted by other programs or administrators.
- The waiting
Locker.WithContext
will try acquiring the lock again automatically and immediately once it has been released by someone or by another program.
When the locker.WithContext
is invoked, it will:
- Try acquiring 3 keys (given that the default
KeyMajority
is 2), which arevalkeylock:0:my_lock
,valkeylock:1:my_lock
andvalkeylock:2:my_lock
, by sending valkey commandSET NX PXAT
orSET NX PX
ifFallbackSETPX
is set. - If the
KeyMajority
is satisfied within theKeyValidity
duration, the invocation is successful and actx
is returned as the lock. - If the invocation is not successful, it will wait for client-side caching notifications to retry again.
- If the invocation is successful, the
Locker
will extend thectx
validity periodically and also watch client-side caching notifications for canceling thectx
if theKeyMajority
is not held anymore.
Some Valkey provider doesn't support client-side caching, ex. Google Cloud Memorystore.
You can disable client-side caching by setting ClientOption.DisableCache
to true
.
Please note that when the client-side caching is disabled, valkeylock will only try to re-acquire locks for every ExtendInterval.
▶ go test -bench=. -benchmem -run=.
goos: darwin
goarch: arm64
pkg: rueidis-benchmark/locker
Benchmark/rueidislock-10 20103 57842 ns/op 1849 B/op 29 allocs/op
Benchmark/redislock-10 13209 86285 ns/op 8083 B/op 225 allocs/op
PASS
ok rueidis-benchmark/locker 3.782s
package locker
import (
"context"
"testing"
"time"
"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"github.com/redis/rueidis"
"github.com/redis/rueidis/rueidislock"
)
func Benchmark(b *testing.B) {
b.Run("rueidislock", func(b *testing.B) {
l, _ := rueidislock.NewLocker(rueidislock.LockerOption{
ClientOption: rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}},
KeyMajority: 1,
NoLoopTracking: true,
})
defer l.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, cancel, err := l.WithContext(context.Background(), "mylock")
if err != nil {
panic(err)
}
cancel()
}
})
b.StopTimer()
})
b.Run("redislock", func(b *testing.B) {
client := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: []string{"127.0.0.1:6379"}})
locker := redislock.New(client)
defer client.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
retry:
lock, err := locker.Obtain(context.Background(), "mylock", time.Minute, nil)
if err == redislock.ErrNotObtained {
goto retry
} else if err != nil {
panic(err)
}
lock.Release(context.Background())
}
})
b.StopTimer()
})
}