Skip to content

Commit

Permalink
Merge pull request #527 from OdyseeTeam/improve-redis-cache
Browse files Browse the repository at this point in the history
Fix cache get method lookup
  • Loading branch information
anbsky authored Nov 20, 2024
2 parents 2750028 + 2905d26 commit d58e7bf
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1
FROM odyseeteam/transcoder-ffmpeg:5.1.1 AS ffmpeg
FROM alpine:3.16
FROM alpine:3.20
EXPOSE 8080

RUN apk update && \
Expand Down
2 changes: 1 addition & 1 deletion app/query/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
if err != nil {
if !errors.Is(err, &store.NotFound{}) {
ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start)
return nil, fmt.Errorf("error during cache.get: %w", err)
return nil, nil
}

ObserveQueryCacheOperation(CacheOperationGet, CacheResultMiss, cacheReq.Method, start)
Expand Down
5 changes: 0 additions & 5 deletions app/query/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,6 @@ func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse,
return r, err
}

// IsCacheable returns true if this query can be cached.
func (q *Query) IsCacheable() bool {
return q.Method() == MethodResolve || q.Method() == MethodClaimSearch
}

func getLogLevel(m string) logrus.Level {
if methodInList(m, []string{MethodWalletBalance, MethodSyncApply}) {
return logrus.DebugLevel
Expand Down
21 changes: 8 additions & 13 deletions pkg/sturdycache/sturdycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"github.com/eko/gocache/lib/v4/store"
redis_store "github.com/eko/gocache/store/redis/v4"
"github.com/redis/go-redis/v9"
"golang.org/x/exp/rand"
)

type ReplicatedCache struct {
masterCache *cache.Cache[any]
replicaCaches []*cache.Cache[any]
readCaches []*cache.Cache[any]
}

// NewReplicatedCache creates a new gocache store instance for redis master-replica setups.
Expand All @@ -32,9 +34,9 @@ func NewReplicatedCache(
masterStore := redis_store.NewRedis(masterClient)
masterCache := cache.New[any](masterStore)

replicaCaches := make([]*cache.Cache[any], len(replicaAddrs))
replicaCaches := []*cache.Cache[any]{}

for i, addr := range replicaAddrs {
for _, addr := range replicaAddrs {
replicaClient := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
Expand All @@ -44,12 +46,13 @@ func NewReplicatedCache(
})

replicaStore := redis_store.NewRedis(replicaClient)
replicaCaches[i] = cache.New[any](replicaStore)
replicaCaches = append(replicaCaches, cache.New[any](replicaStore))
}

cache := &ReplicatedCache{
masterCache: masterCache,
replicaCaches: replicaCaches,
readCaches: append(replicaCaches, masterCache),
}

return cache, nil
Expand All @@ -60,17 +63,9 @@ func (rc *ReplicatedCache) Set(ctx context.Context, key any, value any, options
return rc.masterCache.Set(ctx, key, value, options...)
}

// Get tries replicas first, falls back to master.
// Get reads from master and replica caches.
func (rc *ReplicatedCache) Get(ctx context.Context, key any) (any, error) {
for _, replica := range rc.replicaCaches {
value, err := replica.Get(ctx, key)
if err == nil {
return value, nil
}
}

// Fallback to master
return rc.masterCache.Get(ctx, key)
return rc.readCaches[rand.Intn(len(rc.readCaches))].Get(ctx, key)
}

// Invalidate master cache entries for given options.
Expand Down
33 changes: 19 additions & 14 deletions pkg/sturdycache/sturdycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,6 @@ func (s *ReplicatedCacheTestSuite) TestGetWithReplicaFailures() {
r.Set(testKey, masterValue)
}

// Test scenario: replicas fail one by one
for i, r := range s.replicas {
s.T().Logf("Testing with replica %d down", i)

r.Close()

value, err := s.cache.Get(s.ctx, testKey)
s.Require().NoError(err)
s.Require().Equal(testValue, value)
}

// Test with all replicas down
value, err := s.cache.Get(s.ctx, testKey)
s.Require().NoError(err)
Expand Down Expand Up @@ -171,9 +160,25 @@ func (s *ReplicatedCacheTestSuite) TestSetStructValue() {
err := s.cache.Set(s.ctx, "struct-key", testValue)
s.Require().NoError(err)

value, err := s.cache.Get(s.ctx, "struct-key")
s.Require().NoError(err)
s.Require().NotNil(value)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wait := time.NewTicker(100 * time.Millisecond)
Wait:
for {
select {
case <-ctx.Done():
s.FailNow("failed to read value")
case <-wait.C:
value, err := s.cache.Get(s.ctx, "struct-key")
if err != nil {
continue
}
s.Require().NoError(err)
s.Require().NotNil(value)
break Wait
}
}

}

func TestReplicatedCacheTestSuite(t *testing.T) {
Expand Down

0 comments on commit d58e7bf

Please sign in to comment.