Skip to content

Commit 9206964

Browse files
committed
feat(api): cache hot messages in redis (#35)
1 parent 6cf1e7b commit 9206964

File tree

6 files changed

+114
-18
lines changed

6 files changed

+114
-18
lines changed

src/server/api/go/internal/bootstrap/container.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ func BuildContainer() *do.Injector {
7272
// Redis
7373
do.Provide(inj, func(i *do.Injector) (*redis.Client, error) {
7474
cfg := do.MustInvoke[*config.Config](i)
75-
rdb := cache.New(cfg)
76-
return rdb, nil
75+
return cache.New(cfg)
7776
})
7877

7978
// RabbitMQ Connection
@@ -161,6 +160,7 @@ func BuildContainer() *do.Injector {
161160
do.MustInvoke[*blob.S3Deps](i),
162161
do.MustInvoke[*mq.Publisher](i),
163162
do.MustInvoke[*config.Config](i),
163+
do.MustInvoke[*redis.Client](i),
164164
), nil
165165
})
166166
do.Provide(inj, func(i *do.Injector) (service.BlockService, error) {

src/server/api/go/internal/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ func setDefaults(v *viper.Viper) {
9090
v.SetDefault("database.dsn", "host=127.0.0.1 user=acontext password=helloworld dbname=acontext port=15432 sslmode=disable TimeZone=UTC")
9191
v.SetDefault("redis.addr", "127.0.0.1:16379")
9292
v.SetDefault("redis.password", "helloworld")
93+
v.SetDefault("redis.db", 0)
94+
v.SetDefault("redis.poolSize", 10)
9395
v.SetDefault("s3.endpoint", "http://127.0.0.1:19000")
9496
v.SetDefault("s3.internalEndpoint", "http://127.0.0.1:19000")
9597
v.SetDefault("s3.region", "auto")

src/server/api/go/internal/infra/cache/redis.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ import (
77
"github.com/redis/go-redis/v9"
88
)
99

10-
func New(cfg *config.Config) *redis.Client {
10+
func New(cfg *config.Config) (*redis.Client, error) {
1111
rdb := redis.NewClient(&redis.Options{
1212
Addr: cfg.Redis.Addr,
1313
Password: cfg.Redis.Password,
1414
DB: cfg.Redis.DB,
1515
PoolSize: cfg.Redis.PoolSize,
1616
})
1717

18-
_ = rdb.Ping(context.Background()).Err()
18+
if err := rdb.Ping(context.Background()).Err(); err != nil {
19+
return nil, err
20+
}
1921

20-
return rdb
22+
return rdb, nil
2123
}
2224

2325
func Close(rdb *redis.Client) error {

src/server/api/go/internal/modules/service/session.go

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sort"
99
"time"
1010

11+
"github.com/bytedance/sonic"
1112
"github.com/go-playground/validator/v10"
1213
"github.com/google/uuid"
1314
"github.com/memodb-io/Acontext/internal/config"
@@ -16,6 +17,7 @@ import (
1617
"github.com/memodb-io/Acontext/internal/modules/model"
1718
"github.com/memodb-io/Acontext/internal/modules/repo"
1819
"github.com/memodb-io/Acontext/internal/pkg/paging"
20+
"github.com/redis/go-redis/v9"
1921
"go.uber.org/zap"
2022
"gorm.io/datatypes"
2123
)
@@ -37,16 +39,25 @@ type sessionService struct {
3739
s3 *blob.S3Deps
3840
publisher *mq.Publisher
3941
cfg *config.Config
42+
redis *redis.Client
4043
}
4144

42-
func NewSessionService(sessionRepo repo.SessionRepo, assetReferenceRepo repo.AssetReferenceRepo, log *zap.Logger, s3 *blob.S3Deps, publisher *mq.Publisher, cfg *config.Config) SessionService {
45+
const (
46+
// Redis key prefix for message parts cache
47+
redisKeyPrefixParts = "message:parts:"
48+
// Default TTL for message parts cache (1 hour)
49+
defaultPartsCacheTTL = time.Hour
50+
)
51+
52+
func NewSessionService(sessionRepo repo.SessionRepo, assetReferenceRepo repo.AssetReferenceRepo, log *zap.Logger, s3 *blob.S3Deps, publisher *mq.Publisher, cfg *config.Config, redis *redis.Client) SessionService {
4353
return &sessionService{
4454
sessionRepo: sessionRepo,
4555
assetReferenceRepo: assetReferenceRepo,
4656
log: log,
4757
s3: s3,
4858
publisher: publisher,
4959
cfg: cfg,
60+
redis: redis,
5061
}
5162
}
5263

@@ -239,6 +250,14 @@ func (s *sessionService) SendMessage(ctx context.Context, in SendMessageInput) (
239250
return nil, fmt.Errorf("increment asset reference: %w", err)
240251
}
241252

253+
// Cache parts data in Redis after successful S3 upload
254+
if s.redis != nil {
255+
if err := s.cachePartsInRedis(ctx, asset.SHA256, parts); err != nil {
256+
// Log error but don't fail the request if Redis caching fails
257+
s.log.Warn("failed to cache parts in Redis", zap.String("sha256", asset.SHA256), zap.Error(err))
258+
}
259+
}
260+
242261
// Prepare message metadata
243262
messageMeta := in.MessageMeta
244263
if messageMeta == nil {
@@ -312,14 +331,35 @@ func (s *sessionService) GetMessages(ctx context.Context, in GetMessagesInput) (
312331
for i, m := range msgs {
313332
meta := m.PartsAssetMeta.Data()
314333

315-
// Only download parts if blob service is available
316-
if s.s3 != nil {
317-
parts := []model.Part{}
334+
// Try to get parts from Redis cache first, fallback to S3 if not found
335+
parts := []model.Part{}
336+
cacheHit := false
337+
338+
if s.redis != nil {
339+
if cachedParts, err := s.getPartsFromRedis(ctx, meta.SHA256); err == nil {
340+
parts = cachedParts
341+
cacheHit = true
342+
} else if err != redis.Nil {
343+
// Log actual Redis errors (not cache misses)
344+
s.log.Warn("failed to get parts from Redis", zap.String("sha256", meta.SHA256), zap.Error(err))
345+
}
346+
}
347+
348+
// If cache miss, download from S3
349+
if !cacheHit && s.s3 != nil {
318350
if err := s.s3.DownloadJSON(ctx, meta.S3Key, &parts); err != nil {
319351
continue
320352
}
321-
msgs[i].Parts = parts
353+
// Cache the parts in Redis after successful S3 download
354+
if s.redis != nil {
355+
if err := s.cachePartsInRedis(ctx, meta.SHA256, parts); err != nil {
356+
// Log error but don't fail the request if Redis caching fails
357+
s.log.Warn("failed to cache parts in Redis", zap.String("sha256", meta.SHA256), zap.Error(err))
358+
}
359+
}
322360
}
361+
362+
msgs[i].Parts = parts
323363
}
324364

325365
// Always sort messages from old to new (ascending by created_at)
@@ -363,3 +403,55 @@ func (s *sessionService) GetMessages(ctx context.Context, in GetMessagesInput) (
363403

364404
return out, nil
365405
}
406+
407+
// cachePartsInRedis stores message parts in Redis with a fixed TTL
408+
func (s *sessionService) cachePartsInRedis(ctx context.Context, sha256 string, parts []model.Part) error {
409+
if s.redis == nil {
410+
return errors.New("redis client is not available")
411+
}
412+
413+
// Serialize parts to JSON
414+
jsonData, err := sonic.Marshal(parts)
415+
if err != nil {
416+
return fmt.Errorf("marshal parts to JSON: %w", err)
417+
}
418+
419+
// Use SHA256 as part of Redis key for content-based caching
420+
redisKey := redisKeyPrefixParts + sha256
421+
422+
// Store in Redis with fixed TTL
423+
if err := s.redis.Set(ctx, redisKey, jsonData, defaultPartsCacheTTL).Err(); err != nil {
424+
return fmt.Errorf("set Redis key %s: %w", redisKey, err)
425+
}
426+
427+
return nil
428+
}
429+
430+
// getPartsFromRedis retrieves message parts from Redis cache
431+
// Returns (nil, redis.Nil) on cache miss, which is a normal condition
432+
func (s *sessionService) getPartsFromRedis(ctx context.Context, sha256 string) ([]model.Part, error) {
433+
if s.redis == nil {
434+
return nil, errors.New("redis client is not available")
435+
}
436+
437+
redisKey := redisKeyPrefixParts + sha256
438+
439+
// Get from Redis
440+
val, err := s.redis.Get(ctx, redisKey).Result()
441+
if err != nil {
442+
// redis.Nil means key doesn't exist (cache miss), which is normal
443+
if err == redis.Nil {
444+
return nil, redis.Nil
445+
}
446+
// Other errors are actual Redis errors
447+
return nil, fmt.Errorf("get Redis key %s: %w", redisKey, err)
448+
}
449+
450+
// Deserialize JSON to parts
451+
var parts []model.Part
452+
if err := sonic.Unmarshal([]byte(val), &parts); err != nil {
453+
return nil, fmt.Errorf("unmarshal parts from JSON: %w", err)
454+
}
455+
456+
return parts, nil
457+
}

src/server/api/go/internal/modules/service/session_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestSessionService_Create(t *testing.T) {
180180
},
181181
},
182182
}
183-
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg)
183+
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg, nil)
184184

185185
err := service.Create(ctx, tt.session)
186186

@@ -258,7 +258,7 @@ func TestSessionService_Delete(t *testing.T) {
258258
},
259259
},
260260
}
261-
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg)
261+
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg, nil)
262262

263263
err := service.Delete(ctx, tt.projectID, tt.sessionID)
264264

@@ -343,7 +343,7 @@ func TestSessionService_GetByID(t *testing.T) {
343343
},
344344
},
345345
}
346-
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg)
346+
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg, nil)
347347

348348
result, err := service.GetByID(ctx, tt.session)
349349

@@ -415,7 +415,7 @@ func TestSessionService_UpdateByID(t *testing.T) {
415415
},
416416
},
417417
}
418-
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg)
418+
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg, nil)
419419

420420
err := service.UpdateByID(ctx, tt.session)
421421

@@ -553,7 +553,7 @@ func TestSessionService_List(t *testing.T) {
553553
},
554554
},
555555
}
556-
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg)
556+
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg, nil)
557557

558558
result, err := service.List(ctx, tt.input)
559559

@@ -790,7 +790,7 @@ func TestSessionService_GetMessages(t *testing.T) {
790790
},
791791
}
792792
// Note: blob is nil in test, so GetMessages will skip DownloadJSON and PresignGet
793-
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg)
793+
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg, nil)
794794

795795
result, err := service.GetMessages(ctx, tt.input)
796796

@@ -950,7 +950,7 @@ func TestSessionService_GetMessages_SortOrder(t *testing.T) {
950950
},
951951
},
952952
}
953-
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg)
953+
service := NewSessionService(repo, mockAssetRefRepo, logger, nil, nil, cfg, nil)
954954

955955
result, err := service.GetMessages(ctx, tt.input)
956956

src/server/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ services:
163163
DATABASE_PASSWORD: ${DATABASE_PASSWORD:-helloworld}
164164
DATABASE_NAME: ${DATABASE_NAME:-acontext}
165165
DATABASE_EXPORT_PORT: ${DATABASE_EXPORT_PORT:-5432}
166-
REDIST_HOST: acontext-server-redis
166+
REDIS_HOST: acontext-server-redis
167167
REDIS_PASSWORD: ${REDIS_PASSWORD:-helloworld}
168168
REDIS_EXPORT_PORT: ${REDIS_EXPORT_PORT:-6379}
169169
RABBITMQ_HOST: ${RABBITMQ_HOST:-acontext-server-rabbitmq}

0 commit comments

Comments
 (0)