Skip to content

Commit

Permalink
Merge branch 'improve-redis-cache' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Nov 14, 2024
2 parents f17b59b + 86d7fd3 commit 410a636
Show file tree
Hide file tree
Showing 21 changed files with 678 additions and 112 deletions.
14 changes: 11 additions & 3 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
gpmetrics "github.com/OdyseeTeam/odysee-api/app/geopublish/metrics"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/publish"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/app/wallet"
"github.com/OdyseeTeam/odysee-api/apps/lbrytv/config"
Expand All @@ -26,6 +26,7 @@ import (
"github.com/OdyseeTeam/odysee-api/pkg/keybox"
"github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter"
"github.com/OdyseeTeam/odysee-api/pkg/redislocker"
"github.com/OdyseeTeam/odysee-api/pkg/sturdycache"
"github.com/OdyseeTeam/player-server/pkg/paid"

sentryhttp "github.com/getsentry/sentry-go/http"
Expand Down Expand Up @@ -199,10 +200,17 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio
}

func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Provider, router *sdkrouter.Router) mux.MiddlewareFunc {
queryCache, err := cache.New(cache.DefaultConfig())
store, err := sturdycache.NewReplicatedCache(
config.GetSturdyCacheMaster(),
config.GetSturdyCacheReplicas(),
config.GetSturdyCachePassword(),
)
if err != nil {
panic(err)
}
cache := query.NewQueryCache(store)
logger.Log().Infof("cache configured: master=%s", config.GetSturdyCacheMaster())

defaultHeaders := []string{
wallet.LegacyTokenHeader, wallet.AuthorizationHeader, "X-Requested-With", "Content-Type", "Accept",
}
Expand All @@ -222,7 +230,7 @@ func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Prov
sdkrouter.Middleware(router),
auth.Middleware(oauthAuther), // Will pass forward user/error to next
auth.LegacyMiddleware(legacyProvider),
cache.Middleware(queryCache),
query.CacheMiddleware(cache),
)
}

Expand Down
15 changes: 0 additions & 15 deletions app/geopublish/geopublish.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package geopublish

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/OdyseeTeam/odysee-api/app/geopublish/forklift"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/internal/errors"
"github.com/OdyseeTeam/odysee-api/internal/metrics"
Expand Down Expand Up @@ -422,19 +420,6 @@ func (h *Handler) getUserFromRequest(r *http.Request) (*models.User, error) {
return h.options.userGetter.FromRequest(r)
}

func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *query.Caller {
c := query.NewCaller(sdkAddress, userID)
c.Cache = qCache
c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
q := query.QueryFromContext(ctx)
params := q.ParamsAsMap()
params[fileNameParam] = filename
q.Request.Params = params
return nil, nil
}, "")
return c
}

// observeFailure requires metrics.MeasureMiddleware middleware to be present on the request
func observeFailure(d float64, kind string) {
metrics.ProxyE2ECallDurations.WithLabelValues(method).Observe(d)
Expand Down
8 changes: 3 additions & 5 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ func Handle(w http.ResponseWriter, r *http.Request) {
sdkAddress = rt.RandomServer().Address
}

var qCache *cache.Cache
if cache.IsOnRequest(r) {
qCache = cache.FromRequest(r)
}
c := query.NewCaller(sdkAddress, userID)

remoteIP := ip.FromRequest(r)
Expand All @@ -134,7 +130,9 @@ func Handle(w http.ResponseWriter, r *http.Request) {
return nil, nil
}, "")

c.Cache = qCache
if cache.HasCache(r) {
c.Cache = query.CacheFromRequest(r)
}

rpcRes, err := c.Call(query.AttachOrigin(r.Context(), origin), rpcReq)

Expand Down
8 changes: 4 additions & 4 deletions app/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ retry:
}
}()

var qCache *cache.Cache
if cache.IsOnRequest(r) {
qCache = cache.FromRequest(r)
var qCache *query.QueryCache
if cache.HasCache(r) {
qCache = query.CacheFromRequest(r)
}

var rpcReq *jsonrpc.RPCRequest
Expand Down Expand Up @@ -223,7 +223,7 @@ retry:
observeSuccess(metrics.GetDuration(r))
}

func getCaller(sdkAddress, filename string, userID int, qCache *cache.Cache) *query.Caller {
func getCaller(sdkAddress, filename string, userID int, qCache *query.QueryCache) *query.Caller {
c := query.NewCaller(sdkAddress, userID)
c.Cache = qCache
c.AddPreflightHook(query.AllMethodsHook, func(_ *query.Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
Expand Down
8 changes: 4 additions & 4 deletions app/publish/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ func (h TusHandler) Notify(w http.ResponseWriter, r *http.Request) {
return
}

// upload is completed, notify it to lbrynet server
var qCache *cache.Cache
if cache.IsOnRequest(r) {
qCache = cache.FromRequest(r)
// upload is completed, notify lbrynet server
var qCache *query.QueryCache
if cache.HasCache(r) {
qCache = query.CacheFromRequest(r)
}

var rpcReq *jsonrpc.RPCRequest
Expand Down
160 changes: 160 additions & 0 deletions app/query/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package query

import (
"context"
"crypto"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/OdyseeTeam/odysee-api/internal/metrics"
"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/pkg/rpcerrors"

"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/marshaler"
"github.com/eko/gocache/lib/v4/store"
"github.com/ybbus/jsonrpc"
"golang.org/x/sync/singleflight"
)

type CacheRequest struct {
Method string
Params any
}

type CachedResponse struct {
Result any
Error *jsonrpc.RPCError
}

type QueryCache struct {
cache *marshaler.Marshaler
singleflight *singleflight.Group
}

func NewQueryCache(store cache.CacheInterface[any]) *QueryCache {
m := marshaler.New(store)
return &QueryCache{
cache: m,
singleflight: &singleflight.Group{},
}
}

func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) {
cacheReq := CacheRequest{
Method: query.Method(),
Params: query.Params(),
}

ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()

hit, err := c.cache.Get(ctx, cacheReq, &CachedResponse{})
if err != nil {
if !errors.Is(err, &store.NotFound{}) {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, fmt.Errorf("failed to cache.get: %w", err)
}
metrics.SturdyQueryCacheHitCount.WithLabelValues(cacheReq.Method).Inc()
if getter == nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, errors.New("cache miss with no object getter provided")
}

obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter)
if err != nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, fmt.Errorf("failed to call object getter: %w", err)
}
res, ok := obj.(*jsonrpc.RPCResponse)
if !ok {
return nil, errors.New("unknown type returned by getter")
}
cacheResp := &CachedResponse{Result: res.Result, Error: res.Error}
err = c.cache.Set(
ctx, cacheReq, cacheResp,
store.WithExpiration(cacheReq.Expiration()),
store.WithTags(cacheReq.Tags()),
)
if err != nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
monitor.ErrorToSentry(fmt.Errorf("failed to cache.set: %w", err), map[string]string{"method": cacheReq.Method})
return nil, fmt.Errorf("failed to cache.set: %w", err)
}
return cacheResp, nil
}
cacheResp, ok := hit.(*CachedResponse)
if !ok {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, errors.New("unknown cache object retrieved")
}
metrics.SturdyQueryCacheHitCount.WithLabelValues(cacheReq.Method).Inc()
return cacheResp, nil
}

func (r CacheRequest) Expiration() time.Duration {
switch r.Method {
case MethodResolve:
return 600 * time.Second
case MethodClaimSearch:
return 180 * time.Second
default:
return 60 * time.Second
}
}

func (r CacheRequest) Tags() []string {
return []string{"method:" + r.Method}
}

func (r CacheRequest) GetCacheKey() string {
digester := crypto.MD5.New()
var params string

if r.Params == nil {
params = "()"
} else {
if p, err := json.Marshal(r.Params); err != nil {
params = "(x)"
} else {
params = string(p)
}
}
fmt.Fprintf(digester, "%s:%s:%s", "request", r.Method, params)
hash := digester.Sum(nil)
return fmt.Sprintf("%x", hash)
}

func (r *CachedResponse) RPCResponse(id int) *jsonrpc.RPCResponse {
return &jsonrpc.RPCResponse{
JSONRPC: "2.0",
Result: r.Result,
Error: r.Error,
ID: id,
}
}

func (r *CachedResponse) MarshalBinary() ([]byte, error) {
return json.Marshal(r)
}

func (r *CachedResponse) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, r)
}

func preflightCacheHook(c *Caller, ctx context.Context) (*jsonrpc.RPCResponse, error) {
if c.Cache == nil {
return nil, nil
}
query := QueryFromContext(ctx)
cachedResp, err := c.Cache.Retrieve(query, func() (any, error) {
return c.SendQuery(ctx, query)
})
if err != nil {
return nil, rpcerrors.NewSDKError(err)
}
logger.Log().Debugf("FFS")
return cachedResp.RPCResponse(query.Request.ID), nil
}
2 changes: 1 addition & 1 deletion app/query/cache/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

const ContextKey = "cache"

func IsOnRequest(r *http.Request) bool {
func HasCache(r *http.Request) bool {
return r.Context().Value(ContextKey) != nil
}

Expand Down
36 changes: 10 additions & 26 deletions app/query/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"
"time"

"github.com/OdyseeTeam/odysee-api/app/query/cache"
"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/app/wallet"
"github.com/OdyseeTeam/odysee-api/apps/lbrytv/config"
Expand Down Expand Up @@ -58,7 +57,7 @@ type Caller struct {
postflightHooks []hookEntry

// Cache stores cacheable queries to improve performance
Cache *cache.Cache
Cache *QueryCache

Duration float64

Expand Down Expand Up @@ -110,15 +109,15 @@ func (c *Caller) getRPCClient(method string) jsonrpc.RPCClient {
// to JSON-RPC server altogether.
func (c *Caller) AddPreflightHook(method string, hf Hook, name string) {
c.preflightHooks = append(c.preflightHooks, hookEntry{method, hf, name})
logger.Log().Debugf("added a preflight hook for method %v", method)
logger.Log().Debugf("added a preflight hook for method %s, %s", method, name)
}

// AddPostflightHook adds query postflight hook function,
// allowing to amend the response before it gets sent back to the client
// or to modify log entry fields.
func (c *Caller) AddPostflightHook(method string, hf Hook, name string) {
c.postflightHooks = append(c.postflightHooks, hookEntry{method, hf, name})
logger.Log().Debugf("added a postflight hook for method %v", method)
logger.Log().Debugf("added a postflight hook for method %s, %s", method, name)
}

func (c *Caller) addDefaultHooks() {
Expand All @@ -130,8 +129,13 @@ func (c *Caller) addDefaultHooks() {
c.AddPostflightHook(MethodClaimSearch, postClaimSearchArfleetThumbs, builtinHookName)
c.AddPostflightHook(MethodResolve, postResolveArfleetThumbs, builtinHookName)
}

// This should be applied after all preflight hooks had a chance
c.AddPreflightHook(MethodResolve, preflightCacheHook, "cache")
c.AddPreflightHook(MethodClaimSearch, preflightCacheHook, "cache")
}

// CloneWithoutHook is for testing and debugging purposes.
func (c *Caller) CloneWithoutHook(endpoint, method, name string) *Caller {
cc := NewCaller(endpoint, c.userID)
for _, h := range c.postflightHooks {
Expand Down Expand Up @@ -184,7 +188,7 @@ func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP
return nil, err
}

// Applying preflight hooks
// Applying preflight hooks, if any one of them returns, this will be returned as response
var res *jsonrpc.RPCResponse
ctx = AttachQuery(ctx, q)
for _, hook := range c.preflightHooks {
Expand All @@ -199,27 +203,7 @@ func (c *Caller) call(ctx context.Context, req *jsonrpc.RPCRequest) (*jsonrpc.RP
}
}

if res == nil {
// Attempt to retrieve the result from cache, retrieving and setting it if it's missing,
// and only send the query directly if it's still missing after the cache call somehow.
var ires interface{}
retriever := func() (interface{}, error) { return c.SendQuery(ctx, q) }
if q.IsCacheable() && c.Cache != nil {
ires, err = c.Cache.Retrieve(q.Method(), q.Params(), retriever)
if err != nil {
return nil, rpcerrors.NewSDKError(err)
}
res, _ = ires.(*jsonrpc.RPCResponse)
}
if res == nil {
res, err = c.SendQuery(ctx, q)
}
if err != nil {
return nil, rpcerrors.NewSDKError(err)
}
}

return res, nil
return c.SendQuery(ctx, q)
}

func (c *Caller) SendQuery(ctx context.Context, q *Query) (*jsonrpc.RPCResponse, error) {
Expand Down
Loading

0 comments on commit 410a636

Please sign in to comment.