Skip to content

Commit

Permalink
Improve cache retrieval and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Nov 15, 2024
1 parent 0fd57a9 commit 83d72ef
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 46 deletions.
39 changes: 23 additions & 16 deletions app/query/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,40 +56,49 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
if err != nil {
if !errors.Is(err, &store.NotFound{}) {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, fmt.Errorf("failed to cache.get: %w", err)
return nil, fmt.Errorf("error during cache.get: %w", err)
}

metrics.SturdyQueryCacheMissCount.WithLabelValues(cacheReq.Method).Inc()

if getter == nil {
log.Warnf("nil getter provided for %s", query.Method())
log.Warnf("nil getter provided for %s", cacheReq.Method)
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, errors.New("cache miss with no object getter provided")
return nil, nil
}

log.Infof("cold cache retrieval for %s", query.Method())
// Cold object retrieval after cache miss
log.Infof("cold object retrieval for %s [%s]", cacheReq.Method, cacheReq.GetCacheKey())
obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter)
if err != nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, fmt.Errorf("failed to call object getter: %w", err)
return nil, fmt.Errorf("error calling getter: %w", err)
}

res, ok := obj.(*jsonrpc.RPCResponse)
if !ok {
return nil, errors.New("unknown type returned by getter")
}

cacheResp := &CachedResponse{Result: res.Result, Error: res.Error}
err = c.cache.Set(
ctx, cacheReq, cacheResp,
store.WithExpiration(cacheReq.Expiration()),
store.WithTags(cacheReq.Tags()),
)
if err != nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
monitor.ErrorToSentry(fmt.Errorf("failed to cache.set: %w", err), map[string]string{"method": cacheReq.Method})
return nil, fmt.Errorf("failed to cache.set: %w", err)
if res.Error != nil {
log.Debugf("rpc error received (%s), not caching", cacheReq.Method)
} else {
err = c.cache.Set(
ctx, cacheReq, cacheResp,
store.WithExpiration(cacheReq.Expiration()),
store.WithTags(cacheReq.Tags()),
)
if err != nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method})
return cacheResp, fmt.Errorf("error during cache.set: %w", err)
}
}

return cacheResp, nil
}
log.Debugf("cache hit for %s [%s]", cacheReq.Method, cacheReq.GetCacheKey())
cacheResp, ok := hit.(*CachedResponse)
if !ok {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
Expand Down Expand Up @@ -157,12 +166,10 @@ func preflightCacheHook(caller *Caller, ctx context.Context) (*jsonrpc.RPCRespon
}
query := QueryFromContext(ctx)
cachedResp, err := caller.Cache.Retrieve(query, func() (any, error) {
log.Debugf("cache miss, calling %s", query.Method())
return caller.SendQuery(ctx, query)
})
if err != nil {
return nil, rpcerrors.NewSDKError(err)
}
log.Debugf("cache hit for %s", query.Method())
return cachedResp.RPCResponse(query.Request.ID), nil
}
37 changes: 37 additions & 0 deletions app/query/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package query

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ybbus/jsonrpc"
)

func TestGetCacheKey(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
assert.Equal(1, 1)
seen := map[string]bool{}
params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil}
genCacheKey := func(params map[string]any) string {
req := jsonrpc.NewRequest(MethodResolve, params)
query, err := NewQuery(req, "")
require.NoError(err)
cacheReq := CacheRequest{
Method: query.Method(),
Params: query.Params(),
}
return cacheReq.GetCacheKey()
}
for _, p := range params {
t.Run(fmt.Sprintf("%+v", p), func(t *testing.T) {
cacheKey := genCacheKey(p)
assert.Len(cacheKey, 32)
assert.NotContains(seen, cacheKey)
seen[cacheKey] = true
})
}
assert.Contains(seen, genCacheKey(params[1]))
}
7 changes: 3 additions & 4 deletions app/query/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (c *Caller) Endpoint() string {
return c.endpoint
}

// Call method forwards a JSON-RPC request to the lbrynet server.
// It returns a response that is ready to be sent back to the JSON-RPC client as is.
// Call method takes JSON-RPC request through a set of hooks and forwards it to lbrynet server.
// It returns a response that is ready to be sent back to the JSON-RPC client.
func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) {
origin := OriginFromContext(ctx)
metrics.ProxyCallCounter.WithLabelValues(req.Method, c.Endpoint(), origin).Inc()
Expand All @@ -171,8 +171,6 @@ func (c *Caller) Call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP
return res, err
}

// Call method forwards a JSON-RPC request to the lbrynet server.
// It returns a response that is ready to be sent back to the JSON-RPC client as is.
func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) {
if c.endpoint == "" {
return nil, errors.Err("cannot call blank endpoint")
Expand Down Expand Up @@ -207,6 +205,7 @@ func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP
return c.SendQuery(ctx, q)
}

// SendQuery is where the actual RPC call happens, bypassing all hooks and retrying in case of "wallet not loaded" errors.
func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse, error) {
var (
r *jsonrpc.RPCResponse
Expand Down
45 changes: 19 additions & 26 deletions app/query/caller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,27 +381,11 @@ func TestCaller_CloneWithoutHook(t *testing.T) {

func TestCaller_CallCachingResponses(t *testing.T) {
var err error

srv := test.MockHTTPServer(nil)
defer srv.Close()
srv.NextResponse <- `
{
"jsonrpc": "2.0",
"result": {
"blocked": {
"channels": [],
"total": 0
},
"items": [
{
"address": "bHz3LpVcuadmbK8g6VVUszF9jjH4pxG2Ct",
"amount": "0.5",
"canonical_url": "lbry://@lbry#3f/youtube-is-over-lbry-odysee-are-here#4"
}
]
},
"id": 0
}
`

srv.NextResponse <- resolveResponseFree

c := NewCaller(srv.URL, 0)

Expand All @@ -410,21 +394,30 @@ func TestCaller_CallCachingResponses(t *testing.T) {
c.Cache = NewQueryCache(cache)
require.NoError(t, err)

rpcReq := jsonrpc.NewRequest("claim_search", map[string]any{"urls": "what"})
rpcReq := jsonrpc.NewRequest("resolve", map[string]any{"urls": "what"})
rpcResponse, err := c.Call(bgctx(), rpcReq)
require.NoError(t, err)
assert.Nil(t, rpcResponse.Error)

time.Sleep(1 * time.Second)
q, _ := NewQuery(rpcReq, "")
expResponse, err := decodeResponse(resolveResponseFree)
require.NoError(t, err)
assert.EqualValues(t, expResponse.Result, rpcResponse.Result)

cResp, err := c.Cache.Retrieve(q, nil)
srv.NextResponse <- resolveResponseCouldntFind

rpcReq2 := jsonrpc.NewRequest("resolve", map[string]any{"urls": "one"})
rpcResponse2, err := c.Call(bgctx(), rpcReq2)
require.NoError(t, err)
assert.NotNil(t, cResp.Result)
assert.Nil(t, rpcResponse.Error)

expResponse2, err := decodeResponse(resolveResponseCouldntFind)
require.NoError(t, err)
assert.Nil(t, rpcResponse2.Error)
assert.EqualValues(t, expResponse2.Result, rpcResponse2.Result)

}

func TestCaller_CallNotCachingErrors(t *testing.T) {
t.SkipNow()
var err error
srv := test.MockHTTPServer(nil)
defer srv.Close()
Expand Down Expand Up @@ -453,7 +446,7 @@ func TestCaller_CallNotCachingErrors(t *testing.T) {
q, err := NewQuery(rpcReq, "")
require.NoError(t, err)

cResp, err := c.Cache.Retrieve(q, func() (any, error) { return nil, nil })
cResp, err := c.Cache.Retrieve(q, nil)
require.NoError(t, err)
assert.Nil(t, cResp)
}
Expand Down
16 changes: 16 additions & 0 deletions app/query/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package query

import (
"encoding/json"
"strings"

"github.com/ybbus/jsonrpc"
)

func decodeResponse(r string) (*jsonrpc.RPCResponse, error) {
decoder := json.NewDecoder(strings.NewReader(r))
decoder.DisallowUnknownFields()
decoder.UseNumber()
response := &jsonrpc.RPCResponse{}
return response, decoder.Decode(response)
}

0 comments on commit 83d72ef

Please sign in to comment.