keyed-semaphore provides a semaphore implementation in Go where the locking is based on arbitrary keys. This allows you to limit concurrency for operations associated with specific identifiers, rather than just globally.
For example, you can use it to limit the number of concurrent operations per user ID, per resource ID, or any other comparable key type.
- Key-Based Concurrency Limiting: Limit concurrent access based on generic keys (any Go
comparabletype). - Sharded Implementation for Enhanced Scalability: Includes a
ShardedKeyedSemaphorethat distributes keys across multiple internalKeyedSemaphoreinstances. This significantly reduces lock contention and improves performance, especially under high load with many unique keys. - Configurable Concurrency: Set the maximum number of concurrent acquirers per key.
- Context-Aware Waiting: The
Waitmethod respectscontext.Contextfor cancellation and deadlines. - Non-Blocking TryWait: A
TryWaitmethod is available to attempt acquiring the semaphore without blocking, also respectingcontext.Context. - Dynamic Creation: Semaphores for keys are created on demand when first accessed.
- Robust Memory Management: Implemented reference counting for automatic and safe cleanup of internal resources associated with a key when it's no longer in active use, preventing memory leaks.
- Improved Concurrency Safety: Hardened against race conditions for reliable behavior under high concurrent access.
go get github.com/MonsieurTib/keyed-semaphoreThe following example demonstrates using KeyedSemaphore with string keys. Note that KeyedSemaphore now supports generic key types.
package main
import (
"context"
"fmt"
ks "github.com/MonsieurTib/keyed-semaphore"
"sync"
"time"
)
func worker(id int, resourceID string, semaphore *ks.KeyedSemaphore[string], wg *sync.WaitGroup) {
defer wg.Done()
ctx := context.Background()
fmt.Printf("Worker %d: Attempting lock for resource '%s'...\n", id, resourceID)
err := semaphore.Wait(ctx, resourceID)
if err != nil {
fmt.Printf("Worker %d: Failed lock for resource '%s': %v\n", id, resourceID, err)
return
}
fmt.Printf("Worker %d: Acquired lock for resource '%s'. Working...\n", id, resourceID)
time.Sleep(time.Second * 1) // Simulate work
fmt.Printf("Worker %d: Work done. Releasing lock for resource '%s'.\n", id, resourceID)
err = semaphore.Release(resourceID)
if err != nil {
fmt.Printf("Worker %d: Failed to release lock for resource '%s': %v\n", id, resourceID, err)
}
}
func main() {
// Create a new KeyedSemaphore for string keys, allowing 2 concurrent operations per key.
maxConcurrentPerKey := 2
semaphore := ks.NewKeyedSemaphore[string](maxConcurrentPerKey)
var wg sync.WaitGroup
numWorkers := 5
resourceKey1 := "document-123"
fmt.Printf("Starting %d workers for resource '%s' (max %d concurrent)...\n",
numWorkers, resourceKey1, maxConcurrentPerKey)
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, resourceKey1, semaphore, &wg)
}
// Example with a different key type (if you were using int keys)
// type UserID int
// userIDKey := UserID(42)
// semaphoreInt := ks.NewKeyedSemaphore[UserID](maxConcurrentPerKey)
// ... then use semaphoreInt with userIDKey ...
wg.Wait()
fmt.Println("All workers finished for resourceKey1.")
}For scenarios with a very large number of unique keys or extremely high contention, ShardedKeyedSemaphore can provide better performance by dividing keys among several independent KeyedSemaphore instances (shards).
package main
import (
"context"
"fmt"
ks "github.com/MonsieurTib/keyed-semaphore"
"strconv"
"sync"
"time"
)
func shardedWorker(id int, resourceID string, shardedSem *ks.ShardedKeyedSemaphore[string], wg *sync.WaitGroup) {
defer wg.Done()
ctx := context.Background()
fmt.Printf("ShardedWorker %d: Attempting lock for resource '%s'...\n", id, resourceID)
shard := shardedSem.GetShard(resourceID)
err := shard.Wait(ctx, resourceID)
if err != nil {
fmt.Printf("ShardedWorker %d: Failed lock for resource '%s': %v\n", id, resourceID, err)
return
}
fmt.Printf("ShardedWorker %d: Acquired lock for resource '%s'. Working...\n", id, resourceID)
time.Sleep(time.Millisecond * 500) // Simulate work
fmt.Printf("ShardedWorker %d: Work done. Releasing lock for resource '%s'.\n", id, resourceID)
err = shard.Release(resourceID)
if err != nil {
fmt.Printf("ShardedWorker %d: Failed to release lock for resource '%s': %v\n", id, resourceID, err)
}
}
func main() {
shardCount := 16 // Number of internal shards
maxConcurrentPerKey := 2
// For string keys, you can use the provided HashString function or your own.
shardedSemaphore := ks.NewShardedKeyedSemaphore[string](shardCount, maxConcurrentPerKey, ks.HashString)
var wg sync.WaitGroup
numWorkers := 20
fmt.Printf("Starting %d sharded workers (max %d concurrent per key, across %d shards)...\n",
numWorkers, maxConcurrentPerKey, shardCount)
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
// Simulate different keys that might fall into different shards
resourceKey := "item-" + strconv.Itoa(i%5)
// Pass the main shardedSemaphore instance to the worker
go shardedWorker(i, resourceKey, shardedSemaphore, &wg)
}
wg.Wait()
fmt.Println("All sharded workers finished.")
}type Hasher[K comparable] func(K) uint64
- A function type that takes a key of generic type
Kand returns auint64hash value. HashString(key string) uint64is provided as a convenient hasher for string keys usingxxhash.
NewKeyedSemaphore[K comparable](maxSize int) *KeyedSemaphore[K]: Creates a new keyed semaphore manager.maxSizedefines the maximum concurrent holders for each key.Kcan be any Gocomparabletype.Wait(ctx context.Context, key K) error: Waits to acquire the semaphore for the givenkey. Blocks until acquisition is possible or thectxis cancelled/times out. Returnsctx.Err()on cancellation/timeout.TryWait(ctx context.Context, key K) bool: Attempts to acquire the semaphore for the givenkeywithout blocking. Respectscontext.Contextfor early cancellation. Returnstrueif successful,falseotherwise.Release(key K) error: Releases the semaphore for the givenkey. Returns an error if the semaphore for the key was not previously acquired or if issues occur during release.
NewShardedKeyedSemaphore[K comparable](shardCount, maxSize int, hasher Hasher[K]) *ShardedKeyedSemaphore[K]: Creates a new sharded keyed semaphore manager.shardCount: The number of internalKeyedSemaphoreinstances (shards).maxSize: The maximum concurrent holders for each key within its designated shard.hasher: AHasher[K]function to distribute keys among shards.
(sks *ShardedKeyedSemaphore[K]) GetShard(key K) *KeyedSemaphore[K]: Returns the specificKeyedSemaphoreinstance (shard) responsible for the givenkey. Operations (Wait,TryWait,Release) should then be called on this returned shard.
Contributions are welcome! Please feel free to submit issues and pull requests.