Skip to content

Commit 115a0f2

Browse files
authored
use appName as prefix (#4)
* use appName as prefix * ci
1 parent 6ca1b74 commit 115a0f2

File tree

3 files changed

+98
-42
lines changed

3 files changed

+98
-42
lines changed

.github/workflows/go.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# This workflow will build a golang project
2+
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
3+
4+
name: Go
5+
6+
on:
7+
push:
8+
branches: [ "main" ]
9+
pull_request:
10+
branches: [ "main" ]
11+
12+
jobs:
13+
14+
build:
15+
runs-on: ubuntu-latest
16+
services:
17+
redis:
18+
image: redis
19+
ports:
20+
- 6379:6379
21+
postgres:
22+
image: postgres:14.5
23+
env:
24+
POSTGRES_PASSWORD: my-secret
25+
POSTGRES_DB: wpgx_test_db
26+
PGTZ: 'America/Los_Angeles'
27+
ports:
28+
- 5432:5432
29+
steps:
30+
- uses: actions/checkout@v4
31+
32+
- name: Set up Go
33+
uses: actions/setup-go@v4
34+
with:
35+
go-version: '1.22'
36+
37+
- name: Build
38+
run: go build -v ./...
39+
40+
- name: Golangci-lint
41+
uses: golangci/[email protected]
42+
43+
- name: Test
44+
run: make test-cmd

cache.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
)
2121

2222
const (
23-
lockSuffix = "_LOCK"
24-
delimiter = "~|~"
23+
lockSuffix = "_LOCK"
24+
delimiter = "~|~"
25+
appNameDelimiter = ":"
2526

2627
// Duration to sleep before try to get another distributed lock for single flight.
2728
lockSleep = 50 * time.Millisecond
@@ -84,6 +85,7 @@ type ValueBytesExpiredAt struct {
8485

8586
// DCache implements cache.
8687
type DCache struct {
88+
appName string
8789
conn redis.UniversalClient
8890
readInterval time.Duration
8991
group singleflight.Group
@@ -140,6 +142,7 @@ func NewDCache(
140142

141143
ctx, cancel := context.WithCancel(context.Background())
142144
c := &DCache{
145+
appName: appName,
143146
conn: primaryClient,
144147
stats: stats,
145148
tracer: tracer,
@@ -282,7 +285,7 @@ func (c *DCache) setKey(ctx context.Context, key string, valueBytes []byte, ttl
282285
if err != nil {
283286
return err
284287
}
285-
err = c.conn.Set(ctx, storeKey(key), veBytes, ttl).Err()
288+
err = c.conn.Set(ctx, c.storeKey(key), veBytes, ttl).Err()
286289
if err != nil {
287290
return err
288291
}
@@ -292,7 +295,7 @@ func (c *DCache) setKey(ctx context.Context, key string, valueBytes []byte, ttl
292295

293296
// tryReadFromRedis try to read value from Redis.
294297
func (c *DCache) tryReadFromRedis(ctx context.Context, key string) (*ValueBytesExpiredAt, error) {
295-
veBytes, err := c.conn.Get(ctx, storeKey(key)).Bytes()
298+
veBytes, err := c.conn.Get(ctx, c.storeKey(key)).Bytes()
296299
if err != nil {
297300
return nil, err
298301
}
@@ -311,7 +314,7 @@ func (c *DCache) updateMemoryCache(
311314
ttl = c.memCacheMaxTTLSeconds
312315
}
313316
if c.inMemCache != nil && ttl > 0 {
314-
memValue, err := c.inMemCache.Get([]byte(storeKey(key)))
317+
memValue, err := c.inMemCache.Get([]byte(c.storeKey(key)))
315318
// Broadcast invalidation request only when value is explicitly set to new one,
316319
// by Set(), instead of backfilled from Redis, and if
317320
// (1) The value does not exist before
@@ -320,27 +323,27 @@ func (c *DCache) updateMemoryCache(
320323
if isExplicitSet {
321324
if err == freecache.ErrNotFound ||
322325
(err == nil && !bytes.Equal(ve.ValueBytes, memValue)) {
323-
c.broadcastKeyInvalidate(storeKey(key))
326+
c.broadcastKeyInvalidate(c.storeKey(key))
324327
}
325328
}
326329
// ignore in memory cache error
327-
err = c.inMemCache.Set([]byte(storeKey(key)), ve.ValueBytes, int(ttl))
330+
err = c.inMemCache.Set([]byte(c.storeKey(key)), ve.ValueBytes, int(ttl))
328331
if err != nil {
329-
log.Ctx(ctx).Err(err).Msgf("Failed to set memory cache for key %s", storeKey(key))
332+
log.Ctx(ctx).Err(err).Msgf("Failed to set memory cache for key %s", c.storeKey(key))
330333
c.recordError(errLabelSetMemCache)
331334
}
332335
}
333336
}
334337

335338
// deleteKey delete key in redis and inMemCache
336339
func (c *DCache) deleteKey(ctx context.Context, key string) error {
337-
n, err := c.conn.Del(ctx, storeKey(key)).Result()
340+
n, err := c.conn.Del(ctx, c.storeKey(key)).Result()
338341
if err != nil {
339342
return err
340343
}
341344
if n > 0 {
342345
if c.inMemCache != nil {
343-
c.inMemCache.Del([]byte(storeKey(key)))
346+
c.inMemCache.Del([]byte(c.storeKey(key)))
344347
c.broadcastKeyInvalidate(key)
345348
}
346349
}
@@ -350,7 +353,7 @@ func (c *DCache) deleteKey(ctx context.Context, key string) error {
350353
// broadcastKeyInvalidate pushes key into a list and wait for broadcast
351354
func (c *DCache) broadcastKeyInvalidate(key string) {
352355
c.invalidateMu.Lock()
353-
c.invalidateKeys[storeKey(key)] = struct{}{}
356+
c.invalidateKeys[c.storeKey(key)] = struct{}{}
354357
l := len(c.invalidateKeys)
355358
c.invalidateMu.Unlock()
356359
if l == maxInvalidate {
@@ -442,12 +445,12 @@ func (c *DCache) updateMetrics() {
442445
}
443446
}
444447

445-
func storeKey(key string) string {
446-
return fmt.Sprintf(":{%s}", key)
448+
func (c *DCache) storeKey(key string) string {
449+
return c.appName + appNameDelimiter + fmt.Sprintf("{%s}", key)
447450
}
448451

449-
func lockKey(key string) string {
450-
return fmt.Sprintf(":%s%s", storeKey(key), lockSuffix)
452+
func (c *DCache) lockKey(key string) string {
453+
return c.storeKey(key) + lockSuffix
451454
}
452455

453456
// Get will read the value from cache if exists or call read() to retrieve the value and
@@ -516,7 +519,7 @@ func (c *DCache) GetWithTtl(ctx context.Context, key string, target any, read Re
516519
// lookup in memory cache, return only when unmarshal succeeded.
517520
if c.inMemCache != nil {
518521
var targetBytes []byte
519-
targetBytes, err = c.inMemCache.Get([]byte(storeKey(key)))
522+
targetBytes, err = c.inMemCache.Get([]byte(c.storeKey(key)))
520523
if err == nil {
521524
err = unmarshal(targetBytes, target)
522525
if err == nil {
@@ -532,7 +535,7 @@ func (c *DCache) GetWithTtl(ctx context.Context, key string, target any, read Re
532535

533536
var anyTypedBytes any
534537
var targetHasUnmarshalled bool
535-
anyTypedBytes, err, _ = c.group.Do(lockKey(key), func() (any, error) {
538+
anyTypedBytes, err, _ = c.group.Do(c.lockKey(key), func() (any, error) {
536539
// distributed single flight to query db for value.
537540
for {
538541
ve, e := c.tryReadFromRedis(ctx, key)
@@ -573,7 +576,7 @@ func (c *DCache) GetWithTtl(ctx context.Context, key string, target any, read Re
573576
// To avoid spamming Redis with SetNX requests, only one request should try to get
574577
// the lock per-pod.
575578
// If timeout or not cache-able error, another thread will obtain lock after sleep.
576-
updated, err := c.conn.SetNX(ctx, lockKey(key), "", c.readInterval).Result()
579+
updated, err := c.conn.SetNX(ctx, c.lockKey(key), "", c.readInterval).Result()
577580
if err != nil {
578581
log.Ctx(ctx).Err(err).Msgf("Failed to get lock by SetNX for %s", key)
579582
c.recordError(errLabelSetRedis)

cache_test.go

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -113,25 +113,26 @@ func newTestSuite() *testSuite {
113113
Addr: fmt.Sprintf("127.0.0.1:6379"),
114114
DB: 10,
115115
})
116+
appName := "test"
116117
// max value size is: 100MB / 1024 = 100KB
117118
inMemCache := freecache.NewCache(100 * 1024 * 1024)
118-
cacheRepo, e := NewDCache("test", redisClient, inMemCache, time.Second, true, true)
119+
cacheRepo, e := NewDCache(appName, redisClient, inMemCache, time.Second, true, true)
119120
if e != nil {
120121
panic(e)
121122
}
122123
inMemCache2 := freecache.NewCache(1024 * 1024)
123-
cacheRepo2, e := NewDCache("test", redisClient, inMemCache2, time.Second, false, true)
124+
cacheRepo2, e := NewDCache(appName, redisClient, inMemCache2, time.Second, false, true)
124125
if e != nil {
125126
panic(e)
126127
}
127128
// max value size is: 100MB / 1024 = 100KB
128129
inMemCacheSf1 := freecache.NewCache(1024 * 1024)
129-
cacheRepoSf1, e := NewDCache("test", redisClient, inMemCacheSf1, time.Second, true, true, EnableRedisSingleFlightOption)
130+
cacheRepoSf1, e := NewDCache(appName, redisClient, inMemCacheSf1, time.Second, true, true, EnableRedisSingleFlightOption)
130131
if e != nil {
131132
panic(e)
132133
}
133134
inMemCacheSf2 := freecache.NewCache(1024 * 1024)
134-
cacheRepoSf2, e := NewDCache("test", redisClient, inMemCacheSf2, time.Second, true, true, EnableRedisSingleFlightOption)
135+
cacheRepoSf2, e := NewDCache(appName, redisClient, inMemCacheSf2, time.Second, true, true, EnableRedisSingleFlightOption)
135136
if e != nil {
136137
panic(e)
137138
}
@@ -229,13 +230,13 @@ func (suite *testSuite) TestPopulateCache() {
229230
suite.NoError(err)
230231
suite.Equal(v, vget)
231232

232-
redisBytes, err := suite.redisConn.Get(ctx, storeKey(queryKey)).Bytes()
233+
redisBytes, err := suite.redisConn.Get(ctx, suite.cacheRepo.storeKey(queryKey)).Bytes()
233234
suite.Require().NoError(err)
234235
vredis := &ValueBytesExpiredAt{}
235236
suite.Require().NoError(msgpack.Unmarshal(redisBytes, vredis))
236237
suite.Equal(ev, vredis.ValueBytes)
237238

238-
vinmem, e := suite.inMemCache.Get([]byte(storeKey(queryKey)))
239+
vinmem, e := suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey)))
239240
suite.Require().NoError(e)
240241
suite.Equal(ev, vinmem)
241242

@@ -247,7 +248,7 @@ func (suite *testSuite) TestPopulateCache() {
247248
suite.NoError(err)
248249
suite.Equal(v, vget2)
249250

250-
vinmem2, e := suite.inMemCache2.Get([]byte(storeKey(queryKey)))
251+
vinmem2, e := suite.inMemCache2.Get([]byte(suite.cacheRepo.storeKey(queryKey)))
251252
suite.NoError(e)
252253
suite.Equal(ev, vinmem2)
253254
}
@@ -387,42 +388,42 @@ func (suite *testSuite) TestPopulateCacheWithExpire() {
387388
suite.Equal(v2, vget2)
388389

389390
// get v1
390-
redisBytes, err := suite.redisConn.Get(ctx, storeKey(queryKey1)).Bytes()
391+
redisBytes, err := suite.redisConn.Get(ctx, suite.cacheRepo.storeKey(queryKey1)).Bytes()
391392
suite.Require().NoError(err)
392393
vredis := &ValueBytesExpiredAt{}
393394
suite.Require().NoError(msgpack.Unmarshal(redisBytes, vredis))
394395
suite.Equal(ev1, vredis.ValueBytes)
395396

396-
vinmem, e := suite.inMemCache.Get([]byte(storeKey(queryKey1)))
397+
vinmem, e := suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey1)))
397398
suite.NoError(e)
398399
suite.Equal(ev1, vinmem)
399400

400401
// get v2
401-
redisBytes, err = suite.redisConn.Get(ctx, storeKey(queryKey2)).Bytes()
402+
redisBytes, err = suite.redisConn.Get(ctx, suite.cacheRepo.storeKey(queryKey2)).Bytes()
402403
suite.Require().NoError(err)
403404
suite.Require().NoError(msgpack.Unmarshal(redisBytes, vredis))
404405
suite.Equal(ev2, vredis.ValueBytes)
405406

406-
vinmem, e = suite.inMemCache.Get([]byte(storeKey(queryKey2)))
407+
vinmem, e = suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey2)))
407408
suite.NoError(e)
408409
suite.Equal(ev2, vinmem)
409410

410411
time.Sleep(time.Second * 2)
411412

412413
// get v1, not exist
413-
redisExist := suite.redisConn.Exists(ctx, storeKey(queryKey1)).Val()
414+
redisExist := suite.redisConn.Exists(ctx, suite.cacheRepo.storeKey(queryKey1)).Val()
414415
suite.EqualValues(redisExist, 0)
415416

416-
_, e = suite.inMemCache.Get([]byte(storeKey(queryKey1)))
417+
_, e = suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey1)))
417418
suite.Error(e)
418419

419420
// get v2
420-
redisBytes, err = suite.redisConn.Get(ctx, storeKey(queryKey2)).Bytes()
421+
redisBytes, err = suite.redisConn.Get(ctx, suite.cacheRepo.storeKey(queryKey2)).Bytes()
421422
suite.Require().NoError(err)
422423
suite.Require().NoError(msgpack.Unmarshal(redisBytes, vredis))
423424
suite.Equal(ev2, vredis.ValueBytes)
424425

425-
vinmem, e = suite.inMemCache.Get([]byte(storeKey(queryKey2)))
426+
vinmem, e = suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey2)))
426427
suite.NoError(e)
427428
suite.Equal(ev2, vinmem)
428429
}
@@ -717,11 +718,11 @@ func (suite *testSuite) TestInvalidate() {
717718

718719
// Wait for key to be deleted
719720
time.Sleep(waitTime)
720-
exist, e := suite.redisConn.Exists(context.Background(), storeKey(queryKey)).Result()
721+
exist, e := suite.redisConn.Exists(context.Background(), suite.cacheRepo.storeKey(queryKey)).Result()
721722
suite.NoError(e)
722723
suite.EqualValues(0, exist)
723724

724-
_, e = suite.inMemCache.Get([]byte(storeKey(queryKey)))
725+
_, e = suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey)))
725726
suite.Equal(freecache.ErrNotFound, e)
726727
}
727728

@@ -742,13 +743,13 @@ func (suite *testSuite) TestSet() {
742743
err = suite.cacheRepo.Set(context.Background(), queryKey, newv, Normal.ToDuration())
743744
suite.NoError(err)
744745

745-
redisBytes, err := suite.redisConn.Get(context.Background(), storeKey(queryKey)).Bytes()
746+
redisBytes, err := suite.redisConn.Get(context.Background(), suite.cacheRepo.storeKey(queryKey)).Bytes()
746747
suite.Require().NoError(err)
747748
vredis := &ValueBytesExpiredAt{}
748749
suite.Require().NoError(msgpack.Unmarshal(redisBytes, vredis))
749750
suite.Equal(newve, vredis.ValueBytes)
750751

751-
vinmem, e := suite.inMemCache.Get([]byte(storeKey(queryKey)))
752+
vinmem, e := suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey)))
752753
suite.Require().NoError(e)
753754
suite.Equal(newve, vinmem)
754755

@@ -781,11 +782,11 @@ func (suite *testSuite) TestInvalidateKeyAcrossPods() {
781782
suite.NoError(err)
782783
suite.Equal(v, vget2)
783784

784-
vinmem, e := suite.inMemCache2.Get([]byte(storeKey(queryKey)))
785+
vinmem, e := suite.inMemCache2.Get([]byte(suite.cacheRepo2.storeKey(queryKey)))
785786
suite.NoError(e)
786787
suite.Equal(ve, vinmem)
787788

788-
vinmem, e = suite.inMemCache2.Get([]byte(storeKey(queryKey)))
789+
vinmem, e = suite.inMemCache2.Get([]byte(suite.cacheRepo2.storeKey(queryKey)))
789790
suite.NoError(e)
790791
suite.Equal(ve, vinmem)
791792

@@ -795,15 +796,15 @@ func (suite *testSuite) TestInvalidateKeyAcrossPods() {
795796

796797
// Wait for key to be broadcasted
797798
time.Sleep(time.Second)
798-
exist, e := suite.redisConn.Exists(context.Background(), storeKey(queryKey)).Result()
799+
exist, e := suite.redisConn.Exists(context.Background(), suite.cacheRepo.storeKey(queryKey)).Result()
799800
suite.NoError(e)
800801
suite.EqualValues(0, exist)
801802

802-
_, e = suite.inMemCache.Get([]byte(storeKey(queryKey)))
803+
_, e = suite.inMemCache.Get([]byte(suite.cacheRepo.storeKey(queryKey)))
803804
suite.Equal(freecache.ErrNotFound, e)
804805

805806
// check inmemcache of second pod is invalidated too
806-
_, e = suite.inMemCache2.Get([]byte(storeKey(queryKey)))
807+
_, e = suite.inMemCache2.Get([]byte(suite.cacheRepo2.storeKey(queryKey)))
807808
suite.Equal(freecache.ErrNotFound, e)
808809
}
809810

@@ -812,3 +813,11 @@ func (suite *testSuite) TestSetMemCacheMaxTTlSeconds() {
812813
suite.Require().Error(suite.cacheRepo.SetMemCacheMaxTTLSeconds(0))
813814
suite.Require().Error(suite.cacheRepo.SetMemCacheMaxTTLSeconds(1000000))
814815
}
816+
817+
func (suite *testSuite) TestStoreKey() {
818+
suite.Equal("test:{test}", suite.cacheRepo.storeKey("test"))
819+
}
820+
821+
func (suite *testSuite) TestLockKey() {
822+
suite.Equal("test:{test}_LOCK", suite.cacheRepo.lockKey("test"))
823+
}

0 commit comments

Comments
 (0)