Skip to content

Commit

Permalink
Merge pull request #1990 from josephschorr/cache-improvements
Browse files Browse the repository at this point in the history
Switch caching package's interface to be generic and add experimental flag to try different caches
  • Loading branch information
josephschorr authored Jul 25, 2024
2 parents 62be65d + a90724e commit 383a05f
Show file tree
Hide file tree
Showing 34 changed files with 437 additions and 153 deletions.
2 changes: 2 additions & 0 deletions e2e/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ require (
)

require (
github.com/Yiling-J/theine-go v0.3.2
github.com/gosimple/slug v1.14.0
github.com/lithammer/fuzzysearch v1.1.8
)
Expand Down Expand Up @@ -137,16 +138,21 @@ require (
github.com/cilium/ebpf v0.9.1 // indirect
github.com/containerd/cgroups/v3 v3.0.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/golangci/modinfo v0.3.4 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
github.com/jjti/go-spancheck v0.6.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/lasiar/canonicalheader v1.1.1 // indirect
github.com/maypok86/otter v1.2.1 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect
github.com/quasilyte/go-ruleguard/dsl v0.3.22 // indirect
github.com/samber/slog-common v0.17.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go-simpler.org/musttag v0.12.2 // indirect
golang.org/x/telemetry v0.0.0-20240522233618-39ace7a40ae7 // indirect
)
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OpenPeeDeeP/depguard/v2 v2.2.0 h1:vDfG60vDtIuf0MEOhmLlLLSzqaRM8EMcgJPdp74zmpA=
github.com/OpenPeeDeeP/depguard/v2 v2.2.0/go.mod h1:CIzddKRvLBC4Au5aYP/i3nyaWQ+ClszLIuVocRiCYFQ=
github.com/Yiling-J/theine-go v0.3.2 h1:XcSdMPV9DwBD9gqqSxbBfVJnP8CCiqNSqp3C6YpmMHI=
github.com/Yiling-J/theine-go v0.3.2/go.mod h1:ygLXqrWPZT/a+PzK5hQ0+a6gu0lpAY5IudTcgnPleqI=
github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY=
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
Expand Down Expand Up @@ -850,6 +852,8 @@ github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down Expand Up @@ -904,6 +908,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo=
github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghostiam/protogetter v0.3.6 h1:R7qEWaSgFCsy20yYHNIJsU9ZOb8TziSRRxuAOTVKeOk=
github.com/ghostiam/protogetter v0.3.6/go.mod h1:7lpeDnEJ1ZjL/YtyoN99ljO4z0pd3H0d18/t2dPBxHw=
Expand Down Expand Up @@ -1260,6 +1266,7 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -1332,6 +1339,8 @@ github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A
github.com/mattn/go-sqlite3 v1.14.14 h1:qZgc/Rwetq+MtyE18WhzjokPD93dNqLGNT3QJuLvBGw=
github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/maypok86/otter v1.2.1 h1:xyvMW+t0vE1sKt/++GTkznLitEl7D/msqXkAbLwiC1M=
github.com/maypok86/otter v1.2.1/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4=
github.com/mgechev/revive v1.3.7 h1:502QY0vQGe9KtYJ9FpxMz9rL+Fc/P13CI5POL4uHCcE=
github.com/mgechev/revive v1.3.7/go.mod h1:RJ16jUbF0OWC3co/+XTxmFNgEpUPwnnA0BRllX2aDNA=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
Expand Down Expand Up @@ -1644,7 +1653,9 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
gitlab.com/bosi/decorder v0.4.2 h1:qbQaV3zgwnBZ4zPMhGLW4KZe7A7NwxEhJx39R3shffo=
gitlab.com/bosi/decorder v0.4.2/go.mod h1:muuhHoaJkA9QLcYHq4Mj8FJUwDZ+EirSHRiaTcTf6T8=
Expand Down
10 changes: 6 additions & 4 deletions internal/datastore/proxy/schemacaching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@ const (
)

// DatastoreProxyTestCache returns a cache used for testing.
func DatastoreProxyTestCache(t testing.TB) cache.Cache {
cache, err := cache.NewCache(&cache.Config{
func DatastoreProxyTestCache(t testing.TB) cache.Cache[cache.StringKey, CacheEntry] {
cache, err := cache.NewStandardCache[cache.StringKey, CacheEntry](&cache.Config{
NumCounters: 1000,
MaxCost: 1 * humanize.MiByte,
})
require.Nil(t, err)
return cache
}

type CacheEntry = *cacheEntry

// NewCachingDatastoreProxy creates a new datastore proxy which caches definitions that
// are loaded at specific datastore revisions.
func NewCachingDatastoreProxy(delegate datastore.Datastore, c cache.Cache, gcWindow time.Duration, cachingMode CachingMode, watchHeartbeat time.Duration) datastore.Datastore {
func NewCachingDatastoreProxy(delegate datastore.Datastore, c cache.Cache[cache.StringKey, CacheEntry], gcWindow time.Duration, cachingMode CachingMode, watchHeartbeat time.Duration) datastore.Datastore {
if c == nil {
c = cache.NoopCache()
c = cache.NoopCache[cache.StringKey, CacheEntry]()
}

if cachingMode == JustInTimeCaching {
Expand Down
16 changes: 8 additions & 8 deletions internal/datastore/proxy/schemacaching/standardcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// via the supplied cache.
type definitionCachingProxy struct {
datastore.Datastore
c cache.Cache
c cache.Cache[cache.StringKey, *cacheEntry]
readGroup singleflight.Group
}

Expand Down Expand Up @@ -120,13 +120,12 @@ func listAndCache[T schemaDefinition](
foundDefs := make([]datastore.RevisionedDefinition[T], 0, len(names))
for _, name := range names {
cacheRevisionKey := prefix + ":" + name + "@" + r.rev.String()
loadedRaw, found := r.p.c.Get(cacheRevisionKey)
loaded, found := r.p.c.Get(cache.StringKey(cacheRevisionKey))
if !found {
continue
}

remainingToLoad.Delete(name)
loaded := loadedRaw.(*cacheEntry)
foundDefs = append(foundDefs, datastore.RevisionedDefinition[T]{
Definition: loaded.definition.(T),
LastWrittenRevision: loaded.updated,
Expand All @@ -146,7 +145,7 @@ func listAndCache[T schemaDefinition](
cacheRevisionKey := prefix + ":" + def.Definition.GetName() + "@" + r.rev.String()
estimatedDefinitionSize := estimator(def.Definition.SizeVT())
entry := &cacheEntry{def.Definition, def.LastWrittenRevision, estimatedDefinitionSize, err}
r.p.c.Set(cacheRevisionKey, entry, entry.Size())
r.p.c.Set(cache.StringKey(cacheRevisionKey), entry, entry.Size())
}

// We have to call wait here or else Ristretto may not have the key(s)
Expand All @@ -167,11 +166,11 @@ func readAndCache[T schemaDefinition](
) (T, datastore.Revision, error) {
// Check the cache.
cacheRevisionKey := prefix + ":" + name + "@" + r.rev.String()
loadedRaw, found := r.p.c.Get(cacheRevisionKey)
loaded, found := r.p.c.Get(cache.StringKey(cacheRevisionKey))
if !found {
// We couldn't use the cached entry, load one
var err error
loadedRaw, err, _ = r.p.readGroup.Do(cacheRevisionKey, func() (any, error) {
loadedRaw, err, _ := r.p.readGroup.Do(cacheRevisionKey, func() (any, error) {
// sever the context so that another branch doesn't cancel the
// single-flighted read
loaded, updatedRev, err := reader(internaldatastore.SeparateContextWithTracing(ctx), name)
Expand All @@ -182,7 +181,7 @@ func readAndCache[T schemaDefinition](

estimatedDefinitionSize := estimator(loaded.SizeVT())
entry := &cacheEntry{loaded, updatedRev, estimatedDefinitionSize, err}
r.p.c.Set(cacheRevisionKey, entry, entry.Size())
r.p.c.Set(cache.StringKey(cacheRevisionKey), entry, entry.Size())

// We have to call wait here or else Ristretto may not have the key
// available to a subsequent caller.
Expand All @@ -192,9 +191,10 @@ func readAndCache[T schemaDefinition](
if err != nil {
return *new(T), datastore.NoRevision, err
}

loaded = loadedRaw.(*cacheEntry)
}

loaded := loadedRaw.(*cacheEntry)
return loaded.definition.(T), loaded.updated, loaded.notFound
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/schemacaching/watchingcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type watchingCachingProxy struct {
}

// createWatchingCacheProxy creates and returns a watching cache proxy.
func createWatchingCacheProxy(delegate datastore.Datastore, c cache.Cache, gcWindow time.Duration, watchHeartbeat time.Duration) *watchingCachingProxy {
func createWatchingCacheProxy(delegate datastore.Datastore, c cache.Cache[cache.StringKey, *cacheEntry], gcWindow time.Duration, watchHeartbeat time.Duration) *watchingCachingProxy {
fallbackCache := &definitionCachingProxy{
Datastore: delegate,
c: c,
Expand Down
34 changes: 14 additions & 20 deletions internal/datastore/proxy/schemacaching/watchingcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@ import (
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/testutil"
)

var goleakIgnores = []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/outcaste-io/ristretto.(*lfuPolicy).processItems"),
goleak.IgnoreTopFunction("github.com/outcaste-io/ristretto.(*Cache).processItems"),
goleak.IgnoreCurrent(),
}

func TestWatchingCacheBasicOperation(t *testing.T) {
defer goleak.VerifyNone(t, goleakIgnores...)
defer goleak.VerifyNone(t, append(testutil.GoLeakIgnores(), goleak.IgnoreCurrent())...)

fakeDS := &fakeDatastore{
headRevision: rev("0"),
Expand All @@ -35,7 +29,7 @@ func TestWatchingCacheBasicOperation(t *testing.T) {
errChan: make(chan error, 1),
}

wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache(), 1*time.Hour, 100*time.Millisecond)
wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache[cache.StringKey, *cacheEntry](), 1*time.Hour, 100*time.Millisecond)
require.NoError(t, wcache.startSync(context.Background()))

// Ensure no namespaces are found.
Expand Down Expand Up @@ -128,7 +122,7 @@ func TestWatchingCacheBasicOperation(t *testing.T) {
}

func TestWatchingCacheParallelOperations(t *testing.T) {
defer goleak.VerifyNone(t, goleakIgnores...)
defer goleak.VerifyNone(t, append(testutil.GoLeakIgnores(), goleak.IgnoreCurrent())...)

fakeDS := &fakeDatastore{
headRevision: rev("0"),
Expand All @@ -138,7 +132,7 @@ func TestWatchingCacheParallelOperations(t *testing.T) {
errChan: make(chan error, 1),
}

wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache(), 1*time.Hour, 100*time.Millisecond)
wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache[cache.StringKey, *cacheEntry](), 1*time.Hour, 100*time.Millisecond)
require.NoError(t, wcache.startSync(context.Background()))

// Run some operations in parallel.
Expand Down Expand Up @@ -184,7 +178,7 @@ func TestWatchingCacheParallelOperations(t *testing.T) {
}

func TestWatchingCacheParallelReaderWriter(t *testing.T) {
defer goleak.VerifyNone(t, goleakIgnores...)
defer goleak.VerifyNone(t, append(testutil.GoLeakIgnores(), goleak.IgnoreCurrent())...)

fakeDS := &fakeDatastore{
headRevision: rev("0"),
Expand All @@ -194,7 +188,7 @@ func TestWatchingCacheParallelReaderWriter(t *testing.T) {
errChan: make(chan error, 1),
}

wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache(), 1*time.Hour, 100*time.Millisecond)
wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache[cache.StringKey, *cacheEntry](), 1*time.Hour, 100*time.Millisecond)
require.NoError(t, wcache.startSync(context.Background()))

// Write somenamespace.
Expand Down Expand Up @@ -236,7 +230,7 @@ func TestWatchingCacheParallelReaderWriter(t *testing.T) {
}

func TestWatchingCacheFallbackToStandardCache(t *testing.T) {
defer goleak.VerifyNone(t, goleakIgnores...)
defer goleak.VerifyNone(t, append(testutil.GoLeakIgnores(), goleak.IgnoreCurrent())...)

fakeDS := &fakeDatastore{
headRevision: rev("0"),
Expand All @@ -246,10 +240,10 @@ func TestWatchingCacheFallbackToStandardCache(t *testing.T) {
errChan: make(chan error, 1),
}

c, err := cache.NewCache(&cache.Config{
c, err := cache.NewStandardCache[cache.StringKey, *cacheEntry](&cache.Config{
NumCounters: 1000,
MaxCost: 1000,
DefaultTTL: 1000 * time.Second,
MaxCost: 10000,
DefaultTTL: 10000 * time.Second,
})
require.NoError(t, err)

Expand All @@ -263,7 +257,7 @@ func TestWatchingCacheFallbackToStandardCache(t *testing.T) {

entry, ok := c.Get("n:somenamespace@1")
require.True(t, ok)
require.NotNil(t, entry.(*cacheEntry).notFound)
require.NotNil(t, entry.notFound)

// Disable reading and ensure it still works, via the fallback cache.
fakeDS.readsDisabled = true
Expand All @@ -278,7 +272,7 @@ func TestWatchingCacheFallbackToStandardCache(t *testing.T) {
}

func TestWatchingCachePrepopulated(t *testing.T) {
defer goleak.VerifyNone(t, goleakIgnores...)
defer goleak.VerifyNone(t, append(testutil.GoLeakIgnores(), goleak.IgnoreCurrent())...)

fakeDS := &fakeDatastore{
headRevision: rev("4"),
Expand All @@ -302,7 +296,7 @@ func TestWatchingCachePrepopulated(t *testing.T) {
},
}

c, err := cache.NewCache(&cache.Config{
c, err := cache.NewStandardCache[cache.StringKey, *cacheEntry](&cache.Config{
NumCounters: 1000,
MaxCost: 1000,
DefaultTTL: 1000 * time.Second,
Expand Down
10 changes: 5 additions & 5 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
// Dispatcher is a dispatcher with cacheInst-in caching.
type Dispatcher struct {
d dispatch.Dispatcher
c cache.Cache
c cache.Cache[keys.DispatchCacheKey, any]
keyHandler keys.Handler

checkTotalCounter prometheus.Counter
Expand All @@ -42,8 +42,8 @@ type Dispatcher struct {
lookupSubjectsFromCacheCounter prometheus.Counter
}

func DispatchTestCache(t testing.TB) cache.Cache {
cache, err := cache.NewCache(&cache.Config{
func DispatchTestCache(t testing.TB) cache.Cache[keys.DispatchCacheKey, any] {
cache, err := cache.NewStandardCache[keys.DispatchCacheKey, any](&cache.Config{
NumCounters: 1000,
MaxCost: 1 * humanize.MiByte,
})
Expand All @@ -53,9 +53,9 @@ func DispatchTestCache(t testing.TB) cache.Cache {

// NewCachingDispatcher creates a new dispatch.Dispatcher which delegates
// dispatch requests and caches the responses when possible and desirable.
func NewCachingDispatcher(cacheInst cache.Cache, metricsEnabled bool, prometheusSubsystem string, keyHandler keys.Handler) (*Dispatcher, error) {
func NewCachingDispatcher(cacheInst cache.Cache[keys.DispatchCacheKey, any], metricsEnabled bool, prometheusSubsystem string, keyHandler keys.Handler) (*Dispatcher, error) {
if cacheInst == nil {
cacheInst = cache.NoopCache()
cacheInst = cache.NoopCache[keys.DispatchCacheKey, any]()
}

checkTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down
3 changes: 1 addition & 2 deletions internal/dispatch/caching/cachingdispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ func TestMaxDepthCaching(t *testing.T) {
require.NoError(err)
require.Equal(v1.ResourceCheckResult_MEMBER, resp.ResultsByResourceId[parsed.ObjectId].Membership)

// We have to sleep a while to let the cache converge:
// https://github.com/outcaste-io/ristretto/blob/01b9f37dd0fd453225e042d6f3a27cd14f252cd0/cache_test.go#L17
// We have to sleep a while to let the cache converge
time.Sleep(10 * time.Millisecond)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/dispatch/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Option func(*optionState)
type optionState struct {
metricsEnabled bool
prometheusSubsystem string
cache cache.Cache
cache cache.Cache[keys.DispatchCacheKey, any]
concurrencyLimits graph.ConcurrencyLimits
remoteDispatchTimeout time.Duration
dispatchChunkSize uint16
Expand All @@ -38,7 +38,7 @@ func PrometheusSubsystem(name string) Option {
}

// Cache sets the cache for the remote dispatcher.
func Cache(c cache.Cache) Option {
func Cache(c cache.Cache[keys.DispatchCacheKey, any]) Option {
return func(state *optionState) {
state.cache = c
}
Expand Down
4 changes: 2 additions & 2 deletions internal/dispatch/combined/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type optionState struct {
upstreamCAPath string
grpcPresharedKey string
grpcDialOpts []grpc.DialOption
cache cache.Cache
cache cache.Cache[keys.DispatchCacheKey, any]
concurrencyLimits graph.ConcurrencyLimits
remoteDispatchTimeout time.Duration
secondaryUpstreamAddrs map[string]string
Expand Down Expand Up @@ -104,7 +104,7 @@ func GrpcDialOpts(opts ...grpc.DialOption) Option {
}

// Cache sets the cache for the dispatcher.
func Cache(c cache.Cache) Option {
func Cache(c cache.Cache[keys.DispatchCacheKey, any]) Option {
return func(state *optionState) {
state.cache = c
}
Expand Down
Loading

0 comments on commit 383a05f

Please sign in to comment.