Skip to content

Commit 4d97392

Browse files
authored
Add lock for storing content from source (#37)
* add lock for storing content from source * move lock into client * add failed_to_acquire_lock field to proto response object * increase and make constant * missed spot
1 parent 7050ba5 commit 4d97392

File tree

8 files changed

+319
-95
lines changed

8 files changed

+319
-95
lines changed

pkg/client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,30 @@ func (c *BlobCacheClient) StoreContentFromSource(sourcePath string, sourceOffset
540540
return resp.Hash, nil
541541
}
542542

543+
func (c *BlobCacheClient) StoreContentFromSourceWithLock(sourcePath string, sourceOffset int64) (string, error) {
544+
ctx, cancel := context.WithTimeout(c.ctx, storeContentRequestTimeout)
545+
defer cancel()
546+
547+
client, _, err := c.getGRPCClient(ctx, &ClientRequest{
548+
rt: ClientRequestTypeStorage,
549+
})
550+
if err != nil {
551+
return "", err
552+
}
553+
554+
resp, err := client.StoreContentFromSourceWithLock(ctx, &proto.StoreContentFromSourceRequest{SourcePath: sourcePath, SourceOffset: sourceOffset})
555+
if err != nil {
556+
return "", err
557+
}
558+
559+
if resp.FailedToAcquireLock {
560+
return "", ErrUnableToAcquireLock
561+
}
562+
563+
return resp.Hash, nil
564+
565+
}
566+
543567
func (c *BlobCacheClient) HostsAvailable() bool {
544568
return c.hostMap.Members().Cardinality() > 0
545569
}

pkg/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ var (
1212
ErrUnableToPopulateContent = errors.New("unable to populate content from original source")
1313
ErrBlobFsMountFailure = errors.New("failed to mount blobfs")
1414
ErrTailscaleAuthentication = errors.New("tailscale authentication failed")
15+
ErrUnableToAcquireLock = errors.New("unable to acquire lock")
1516
)

pkg/metadata.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ import (
1616
redis "github.com/redis/go-redis/v9"
1717
)
1818

19+
const (
20+
storeFromContentLockTtlS = 5
21+
)
22+
1923
type BlobCacheMetadata struct {
2024
rdb *RedisClient
2125
lock *RedisLock
@@ -368,17 +372,30 @@ func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, id string) er
368372
return nil
369373
}
370374

375+
func (m *BlobCacheMetadata) SetStoreFromContentLock(ctx context.Context, sourcePath string) error {
376+
return m.lock.Acquire(ctx, MetadataKeys.MetadataStoreFromContentLock(sourcePath), RedisLockOptions{TtlS: storeFromContentLockTtlS, Retries: 0})
377+
}
378+
379+
func (m *BlobCacheMetadata) RefreshStoreFromContentLock(ctx context.Context, sourcePath string) error {
380+
return m.lock.Refresh(MetadataKeys.MetadataStoreFromContentLock(sourcePath), RedisLockOptions{TtlS: storeFromContentLockTtlS, Retries: 0})
381+
}
382+
383+
func (m *BlobCacheMetadata) RemoveStoreFromContentLock(ctx context.Context, sourcePath string) error {
384+
return m.lock.Release(MetadataKeys.MetadataStoreFromContentLock(sourcePath))
385+
}
386+
371387
// Metadata key storage format
372388
var (
373-
metadataPrefix string = "blobcache"
374-
metadataHostIndex string = "blobcache:host_index"
375-
metadataEntry string = "blobcache:entry:%s"
376-
metadataClientLock string = "blobcache:client_lock:%s:%s"
377-
metadataLocation string = "blobcache:location:%s"
378-
metadataRef string = "blobcache:ref:%s"
379-
metadataFsNode string = "blobcache:fs:node:%s"
380-
metadataFsNodeChildren string = "blobcache:fs:node:%s:children"
381-
metadataHostKeepAlive string = "blobcache:host:keepalive:%s"
389+
metadataPrefix string = "blobcache"
390+
metadataHostIndex string = "blobcache:host_index"
391+
metadataEntry string = "blobcache:entry:%s"
392+
metadataClientLock string = "blobcache:client_lock:%s:%s"
393+
metadataLocation string = "blobcache:location:%s"
394+
metadataRef string = "blobcache:ref:%s"
395+
metadataFsNode string = "blobcache:fs:node:%s"
396+
metadataFsNodeChildren string = "blobcache:fs:node:%s:children"
397+
metadataHostKeepAlive string = "blobcache:host:keepalive:%s"
398+
metadataStoreFromContentLock string = "blobcache:store_from_content_lock:%s"
382399
)
383400

384401
// Metadata keys
@@ -418,6 +435,11 @@ func (k *metadataKeys) MetadataClientLock(hostname, hash string) string {
418435
return fmt.Sprintf(metadataClientLock, hostname, hash)
419436
}
420437

438+
func (k *metadataKeys) MetadataStoreFromContentLock(sourcePath string) string {
439+
sourcePath = strings.ReplaceAll(sourcePath, "/", "_")
440+
return fmt.Sprintf(metadataStoreFromContentLock, sourcePath)
441+
}
442+
421443
var MetadataKeys = &metadataKeys{}
422444

423445
type metadataKeys struct{}

pkg/redis.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,20 @@ func (l *RedisLock) Release(key string) error {
312312
return redislock.ErrLockNotHeld
313313
}
314314

315+
func (l *RedisLock) Refresh(key string, opts RedisLockOptions) error {
316+
l.mu.Lock()
317+
defer l.mu.Unlock()
318+
319+
lock, ok := l.locks[key]
320+
if !ok {
321+
return redislock.ErrLockNotHeld
322+
}
323+
324+
return lock.Refresh(context.TODO(), time.Duration(opts.TtlS)*time.Second, &redislock.Options{
325+
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(100*time.Millisecond), opts.Retries),
326+
})
327+
}
328+
315329
// Copies the result of HGetAll to a provided struct.
316330
// If a field cannot be parsed, we use Go's default value.
317331
// Struct fields must have the redis tag on them otherwise they will be ignored.

pkg/server.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,3 +348,38 @@ func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.S
348348

349349
return &proto.StoreContentFromSourceResponse{Ok: true, Hash: hash}, nil
350350
}
351+
352+
func (cs *CacheService) StoreContentFromSourceWithLock(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceWithLockResponse, error) {
353+
sourcePath := req.SourcePath
354+
if err := cs.metadata.SetStoreFromContentLock(ctx, sourcePath); err != nil {
355+
return &proto.StoreContentFromSourceWithLockResponse{FailedToAcquireLock: true}, nil
356+
}
357+
358+
storeContext, cancel := context.WithCancel(ctx)
359+
defer cancel()
360+
361+
go func() {
362+
ticker := time.NewTicker(time.Second)
363+
defer ticker.Stop()
364+
for {
365+
select {
366+
case <-storeContext.Done():
367+
return
368+
case <-ticker.C:
369+
Logger.Infof("StoreContentFromSourceWithLock[REFRESH] - [%s]", sourcePath)
370+
cs.metadata.RefreshStoreFromContentLock(ctx, sourcePath)
371+
}
372+
}
373+
}()
374+
375+
storeContentFromSourceResp, err := cs.StoreContentFromSource(storeContext, req)
376+
if err != nil {
377+
return nil, err
378+
}
379+
380+
if err := cs.metadata.RemoveStoreFromContentLock(ctx, sourcePath); err != nil {
381+
Logger.Errorf("StoreContentFromSourceWithLock[ERR] - error removing lock: %v", err)
382+
}
383+
384+
return &proto.StoreContentFromSourceWithLockResponse{Hash: storeContentFromSourceResp.Hash}, nil
385+
}

0 commit comments

Comments
 (0)