Skip to content

Commit 29d19f4

Browse files
feat: add write-through disk caching & dynamic config (#43)
* add pct * cleanup * one more check * move helpers down * wip * add dynamic configuration endpoint * example region * only override config if no err * fix cache ids * update readme * remove word * wip * push store content benchmark --------- Co-authored-by: Luke Lombardi <[email protected]>
1 parent 533cd27 commit 29d19f4

File tree

12 files changed

+1438
-444
lines changed

12 files changed

+1438
-444
lines changed

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
# blobcache
1+
# Blobcache
22

33
## Overview
4-
A very simple in-memory cache used as a content-addressed cache. Exposes a GRPC server that can be embedded directly in a golang application.
4+
Blobcache is a distributed, in-memory cache used to cache objects near the workloads that need them. It does not require an external metadata server
5+
for each region, making it easy to spin up in various regions without performance penalties.
6+
7+
## Features
8+
- **Tiered Storage**: Fast and efficient caching mechanism - both in-memory and on disk
9+
- **Content-Addressed**: Stores data based on content hash, so workloads sharing the same data (i.e. model weights), can benefit from the distributed cache.
10+
- **GRPC Interface**: Easily embeddable in Golang applications for seamless integration

e2e/throughput/main.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,14 @@ func main() {
5353
hashBytes := sha256.Sum256(b)
5454
fileHash := hex.EncodeToString(hashBytes[:])
5555

56-
hash, err := storeFile(client, filePath, fileHash)
56+
var totalStoreTime float64
57+
58+
hash, storeElapsedTime, err := storeFile(client, filePath, fileHash)
5759
if err != nil {
5860
log.Fatalf("Failed to store file: %v\n", err)
5961
}
62+
log.Printf("StoreContent elapsed time: %f seconds\n", storeElapsedTime)
63+
totalStoreTime += storeElapsedTime
6064

6165
var totalStreamResult, totalGetContentResult TestResult
6266
for i := 0; i < totalIterations; i++ {
@@ -83,10 +87,12 @@ func main() {
8387
log.Printf("TestGetContent - %v\n", getContentResult)
8488
}
8589

86-
GenerateReport(totalStreamResult, totalGetContentResult, len(b), totalIterations)
90+
GenerateReport(totalStreamResult, totalGetContentResult, totalStoreTime, len(b), totalIterations)
8791
}
8892

89-
func storeFile(client *blobcache.BlobCacheClient, filePath string, fileHash string) (string, error) {
93+
func storeFile(client *blobcache.BlobCacheClient, filePath string, fileHash string) (string, float64, error) {
94+
startTime := time.Now() // Start timing
95+
9096
chunks := make(chan []byte)
9197
go func() {
9298
file, err := os.Open(filePath)
@@ -116,9 +122,11 @@ func storeFile(client *blobcache.BlobCacheClient, filePath string, fileHash stri
116122

117123
hash, err := client.StoreContent(chunks, fileHash)
118124
if err != nil {
119-
return "", err
125+
return "", 0, err
120126
}
121-
return hash, nil
127+
128+
elapsedTime := time.Since(startTime).Seconds() // Calculate elapsed time
129+
return hash, elapsedTime, nil
122130
}
123131

124132
func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSize int, expectedHash string) (TestResult, error) {
@@ -212,13 +220,17 @@ func TestGetContent(client *blobcache.BlobCacheClient, hash string, fileSize int
212220
return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil
213221
}
214222

215-
func GenerateReport(streamResult, contentResult TestResult, fileSize, iterations int) {
223+
func GenerateReport(streamResult, contentResult TestResult, totalStoreTime float64, fileSize, iterations int) {
216224
averageTimeStream := streamResult.ElapsedTime / float64(iterations)
217225
averageTimeContent := contentResult.ElapsedTime / float64(iterations)
226+
averageTimeStore := totalStoreTime / float64(iterations)
218227
totalBytesReadMB := float64(fileSize*iterations) / (1024 * 1024)
219228
mbPerSecondStream := totalBytesReadMB / streamResult.ElapsedTime
220229
mbPerSecondContent := totalBytesReadMB / contentResult.ElapsedTime
221230

231+
log.Printf("================================================")
232+
log.Printf("Total time for StoreContent: %f seconds\n", totalStoreTime)
233+
log.Printf("Average time per iteration for StoreContent: %f seconds\n", averageTimeStore)
222234
log.Printf("Total time for GetContentStream: %f seconds\n", streamResult.ElapsedTime)
223235
log.Printf("Average time per iteration for GetContentStream: %f seconds\n", averageTimeStream)
224236
log.Printf("Total time for GetContent: %f seconds\n", contentResult.ElapsedTime)

pkg/config.default.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
server:
22
mode: slave
3+
diskCacheDir: /blobcache-data
4+
diskCacheMaxUsagePct: 0.95
35
token:
46
rttThresholdMilliseconds: 100
57
hostStorageCapacityThresholdPct: 0.95
@@ -10,6 +12,12 @@ server:
1012
redisAddr:
1113
redisPasswd:
1214
redisTLSEnabled: false
15+
regions:
16+
us-east-1:
17+
token:
18+
rttThresholdMilliseconds: 40
19+
hostStorageCapacityThresholdPct: 0.5
20+
maxCachePct: 50
1321
client:
1422
token:
1523
nTopHosts: 3

pkg/coordinator_local.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ type CoordinatorClient interface {
88
AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
99
SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
1010
GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
11+
GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
1112
SetClientLock(ctx context.Context, hash string, host string) error
1213
RemoveClientLock(ctx context.Context, hash string, host string) error
1314
SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
@@ -35,6 +36,10 @@ func NewCoordinatorClientLocal(globalConfig BlobCacheGlobalConfig, serverConfig
3536
return &CoordinatorClientLocal{globalConfig: globalConfig, serverConfig: serverConfig, metadata: metadata}, nil
3637
}
3738

39+
func (c *CoordinatorClientLocal) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error) {
40+
return c.serverConfig, nil
41+
}
42+
3843
func (c *CoordinatorClientLocal) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error {
3944
return c.metadata.AddHostToIndex(ctx, locality, host)
4045
}

pkg/coordinator_remote.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,20 @@ func (c *CoordinatorClientRemote) GetAvailableHosts(ctx context.Context, localit
132132
return hosts, nil
133133
}
134134

135+
func (c *CoordinatorClientRemote) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error) {
136+
response, err := c.client.GetRegionConfig(ctx, &proto.GetRegionConfigRequest{Locality: locality})
137+
if err != nil {
138+
return BlobCacheServerConfig{}, err
139+
}
140+
141+
if !response.Ok {
142+
return BlobCacheServerConfig{}, errors.New("failed to get region config")
143+
}
144+
145+
cfg := BlobCacheServerConfigFromProto(response.ServerConfig)
146+
return cfg, nil
147+
}
148+
135149
func (c *CoordinatorClientRemote) SetClientLock(ctx context.Context, hash string, hostId string) error {
136150
r, err := c.client.SetClientLock(ctx, &proto.SetClientLockRequest{Hash: hash, HostId: hostId})
137151
if err != nil {

pkg/server.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ type CacheService struct {
5050
}
5151

5252
func NewCacheService(ctx context.Context, cfg BlobCacheConfig, locality string) (*CacheService, error) {
53-
hostId := fmt.Sprintf("%s-%s", BlobCacheHostPrefix, uuid.New().String()[:6])
5453
currentHost := &BlobCacheHost{
5554
RTT: 0,
5655
}
@@ -62,12 +61,29 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig, locality string)
6261
coordinator, err = NewCoordinatorClientLocal(cfg.Global, cfg.Server)
6362
default:
6463
coordinator, err = NewCoordinatorClientRemote(cfg.Global, cfg.Client.Token)
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
regionConfig, err := coordinator.GetRegionConfig(ctx, locality)
69+
if err != nil {
70+
Logger.Infof("No region-specific config found for locality %s, using current config", locality)
71+
} else {
72+
cfg.Server = regionConfig
73+
}
6574
}
6675
if err != nil {
6776
return nil, err
6877
}
6978

70-
Logger.Infof("Server started in %s mode", cfg.Server.Mode)
79+
// Create the disk cache directory if it doesn't exist
80+
err = os.MkdirAll(cfg.Server.DiskCacheDir, 0755)
81+
if err != nil {
82+
return nil, err
83+
}
84+
85+
hostId := getHostId(cfg.Server)
86+
Logger.Infof("Server<%s> started in %s mode", hostId, cfg.Server.Mode)
7187

7288
publicIpAddr, _ := GetPublicIpAddr()
7389
if publicIpAddr != "" {
@@ -116,6 +132,20 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig, locality string)
116132
return cs, nil
117133
}
118134

135+
func getHostId(serverConfig BlobCacheServerConfig) string {
136+
filePath := filepath.Join(serverConfig.DiskCacheDir, "HOST_ID")
137+
138+
hostId := ""
139+
if content, err := os.ReadFile(filePath); err == nil {
140+
hostId = strings.TrimSpace(string(content))
141+
} else {
142+
hostId = fmt.Sprintf("%s-%s", BlobCacheHostPrefix, uuid.New().String()[:6])
143+
os.WriteFile(filePath, []byte(hostId), 0644)
144+
}
145+
146+
return hostId
147+
}
148+
119149
func (cs *CacheService) HostKeepAlive() {
120150
err := cs.coordinator.SetHostKeepAlive(cs.ctx, cs.locality, cs.cas.currentHost)
121151
if err != nil {

pkg/server_coordinator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ func (cs *CacheService) GetAvailableHosts(ctx context.Context, req *proto.GetAva
2323
return &proto.GetAvailableHostsResponse{Hosts: protoHosts, Ok: true}, nil
2424
}
2525

26+
func (cs *CacheService) GetRegionConfig(ctx context.Context, req *proto.GetRegionConfigRequest) (*proto.GetRegionConfigResponse, error) {
27+
Logger.Infof("GetRegionConfig[ACK] - [%s]", req.Locality)
28+
29+
config, ok := cs.serverConfig.Regions[req.Locality]
30+
if !ok {
31+
return &proto.GetRegionConfigResponse{Ok: false, ServerConfig: nil}, nil
32+
}
33+
34+
return &proto.GetRegionConfigResponse{Ok: true, ServerConfig: config.ServerConfig.ToProto()}, nil
35+
}
36+
2637
func (cs *CacheService) SetClientLock(ctx context.Context, req *proto.SetClientLockRequest) (*proto.SetClientLockResponse, error) {
2738
Logger.Debugf("SetClientLock[ACK] - [%s]", req.Hash)
2839

0 commit comments

Comments
 (0)