Skip to content

Commit 3a130ac

Browse files
authored
instrument with victoria metrics pusher (#59)
* instrument with victoria metrics pusher * add memory stats * add the label on all metrics being output
1 parent 3d0f83e commit 3a130ac

File tree

5 files changed

+149
-44
lines changed

5 files changed

+149
-44
lines changed

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/beam-cloud/blobcache-v2
33
go 1.22.10
44

55
require (
6+
github.com/VictoriaMetrics/metrics v1.37.0
67
github.com/aws/aws-sdk-go-v2 v1.30.1
78
github.com/aws/aws-sdk-go-v2/config v1.27.24
89
github.com/aws/aws-sdk-go-v2/credentials v1.17.24
@@ -50,6 +51,8 @@ require (
5051
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
5152
github.com/rogpeppe/go-internal v1.11.0 // indirect
5253
github.com/stretchr/testify v1.9.0 // indirect
54+
github.com/valyala/fastrand v1.1.0 // indirect
55+
github.com/valyala/histogram v1.2.0 // indirect
5356
)
5457

5558
require (

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/VictoriaMetrics/metrics v1.37.0 h1:u5Yr+HFofQyn7kgmmkufgkX0nEA6G1oEyK2eaKsVaUM=
2+
github.com/VictoriaMetrics/metrics v1.37.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
13
github.com/aws/aws-sdk-go-v2 v1.30.1 h1:4y/5Dvfrhd1MxRDD77SrfsDaj8kUkkljU7XE83NPV+o=
24
github.com/aws/aws-sdk-go-v2 v1.30.1/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
35
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
@@ -142,6 +144,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
142144
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
143145
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
144146
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
147+
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
148+
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
149+
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
150+
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
145151
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
146152
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
147153
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=

pkg/metrics.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package blobcache
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"fmt"
7+
"time"
8+
9+
"github.com/VictoriaMetrics/metrics"
10+
)
11+
12+
type BlobcacheMetrics struct {
13+
DiskCacheUsageMB *metrics.Histogram
14+
DiskCacheUsagePct *metrics.Histogram
15+
MemCacheUsageMB *metrics.Histogram
16+
MemCacheUsagePct *metrics.Histogram
17+
}
18+
19+
func initMetrics(ctx context.Context, config BlobCacheMetricsConfig, currentHost *BlobCacheHost) BlobcacheMetrics {
20+
username := config.Username
21+
password := config.Password
22+
credentials := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
23+
24+
opts := &metrics.PushOptions{
25+
Headers: []string{
26+
fmt.Sprintf("Authorization: Basic %s", credentials),
27+
},
28+
ExtraLabels: "host=" + currentHost.HostId,
29+
}
30+
31+
pushURL := config.URL
32+
interval := time.Duration(config.PushIntervalS) * time.Second
33+
pushProcessMetrics := true
34+
35+
err := metrics.InitPushWithOptions(ctx, pushURL, interval, pushProcessMetrics, opts)
36+
if err != nil {
37+
Logger.Errorf("Failed to initialize metrics: %v", err)
38+
}
39+
40+
diskCacheUsageMB := metrics.NewHistogram(`blobcache_disk_cache_usage_mb`)
41+
diskCacheUsagePct := metrics.NewHistogram(`blobcache_disk_cache_usage_pct`)
42+
memCacheUsageMB := metrics.NewHistogram(`blobcache_mem_cache_usage_mb`)
43+
memCacheUsagePct := metrics.NewHistogram(`blobcache_mem_cache_usage_pct`)
44+
45+
return BlobcacheMetrics{
46+
DiskCacheUsageMB: diskCacheUsageMB,
47+
DiskCacheUsagePct: diskCacheUsagePct,
48+
MemCacheUsageMB: memCacheUsageMB,
49+
MemCacheUsagePct: memCacheUsagePct,
50+
}
51+
}

pkg/storage.go

Lines changed: 78 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type ContentAddressableStorage struct {
3232
diskCacheDir string
3333
diskCachedUsageExceeded bool
3434
mu sync.Mutex
35+
metrics BlobcacheMetrics
3536
}
3637

3738
func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, locality string, coordinator CoordinatorClient, config BlobCacheConfig) (*ContentAddressableStorage, error) {
@@ -48,15 +49,16 @@ func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHos
4849
locality: locality,
4950
diskCacheDir: config.Server.DiskCacheDir,
5051
mu: sync.Mutex{},
52+
metrics: initMetrics(ctx, config.Metrics, currentHost),
5153
}
5254

5355
Logger.Infof("Disk cache directory located at: '%s'", cas.diskCacheDir)
5456

55-
availableMemoryMb := getAvailableMemoryMb()
56-
maxCacheSizeMb := (availableMemoryMb * cas.serverConfig.MaxCachePct) / 100
57+
_, totalMemoryMb := getMemoryMb()
58+
maxCacheSizeMb := (totalMemoryMb * cas.serverConfig.MaxCachePct) / 100
5759
maxCost := maxCacheSizeMb * 1e6
5860

59-
Logger.Infof("Total available memory: %dMB", availableMemoryMb)
61+
Logger.Infof("Total available memory: %dMB", totalMemoryMb)
6062
Logger.Infof("Max cache size: %dMB", maxCacheSizeMb)
6163
Logger.Infof("Max cost: %d", maxCost)
6264

@@ -75,22 +77,13 @@ func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHos
7577
return nil, err
7678
}
7779

78-
cas.cache = cache
79-
cas.maxCacheSizeMb = maxCacheSizeMb
80-
8180
go cas.monitorDiskCacheUsage()
8281

82+
cas.cache = cache
83+
cas.maxCacheSizeMb = maxCacheSizeMb
8384
return cas, nil
8485
}
8586

86-
func getAvailableMemoryMb() int64 {
87-
v, err := mem.VirtualMemory()
88-
if err != nil {
89-
log.Fatalf("Unable to retrieve host memory info: %v", err)
90-
}
91-
return int64(v.Total / (1024 * 1024))
92-
}
93-
9487
type cacheValue struct {
9588
Hash string
9689
Content []byte
@@ -286,39 +279,42 @@ func (cas *ContentAddressableStorage) Cleanup() {
286279
cas.cache.Close()
287280
}
288281

289-
func min(a, b int64) int64 {
290-
if a < b {
291-
return a
282+
func (cas *ContentAddressableStorage) GetDiskCacheMetrics() (int64, int64, float64, error) {
283+
var (
284+
diskUsageMb int64
285+
totalDiskSpaceMb int64
286+
usagePercentage float64
287+
err error
288+
)
289+
290+
// Get current disk usage
291+
diskUsageMb, err = getDiskUsageMb(cas.diskCacheDir)
292+
if err != nil {
293+
return 0, 0, 0, err
292294
}
293-
return b
294-
}
295295

296-
func (cas *ContentAddressableStorage) monitorDiskCacheUsage() {
297-
ticker := time.NewTicker(diskCacheUsageCheckInterval)
298-
defer ticker.Stop()
296+
// Get total disk capacity
297+
totalDiskSpaceMb, err = getTotalDiskSpaceMb(cas.diskCacheDir)
298+
if err != nil {
299+
return 0, 0, 0, err
300+
}
299301

300-
for {
301-
select {
302-
case <-cas.ctx.Done():
303-
return
304-
case <-ticker.C:
305-
usage, err := getDiskUsageMb(cas.diskCacheDir)
306-
if err == nil {
307-
totalDiskSpace, err := getTotalDiskSpaceMb(cas.diskCacheDir)
308-
if err == nil {
309-
usagePct := float64(usage) / float64(totalDiskSpace)
310-
311-
Logger.Infof("Disk cache usage: %dMB / %dMB (%.2f%%)", usage, totalDiskSpace, usagePct*100)
312-
313-
cas.mu.Lock()
314-
cas.diskCachedUsageExceeded = usagePct > cas.serverConfig.DiskCacheMaxUsagePct
315-
cas.mu.Unlock()
316-
}
317-
}
318-
}
302+
// Calculate usage percentage
303+
if totalDiskSpaceMb > 0 {
304+
usagePercentage = (float64(diskUsageMb) / float64(totalDiskSpaceMb)) * 100
305+
} else {
306+
usagePercentage = 0
319307
}
308+
309+
return diskUsageMb, totalDiskSpaceMb, usagePercentage, nil
320310
}
321311

312+
func min(a, b int64) int64 {
313+
if a < b {
314+
return a
315+
}
316+
return b
317+
}
322318
func getDiskUsageMb(path string) (int64, error) {
323319
var totalUsage int64 = 0
324320
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
@@ -344,3 +340,44 @@ func getTotalDiskSpaceMb(path string) (int64, error) {
344340
}
345341
return int64(stat.Blocks) * int64(stat.Bsize) / (1024 * 1024), nil
346342
}
343+
344+
func getMemoryMb() (int64, int64) {
345+
v, err := mem.VirtualMemory()
346+
if err != nil {
347+
log.Fatalf("Unable to retrieve host memory info: %v", err)
348+
}
349+
return int64(v.Available / (1024 * 1024)), int64(v.Total / (1024 * 1024))
350+
}
351+
352+
func (cas *ContentAddressableStorage) monitorDiskCacheUsage() {
353+
ticker := time.NewTicker(diskCacheUsageCheckInterval)
354+
defer ticker.Stop()
355+
356+
for {
357+
select {
358+
case <-cas.ctx.Done():
359+
return
360+
case <-ticker.C:
361+
currentUsage, totalDiskSpace, usagePercentage, err := cas.GetDiskCacheMetrics()
362+
if err != nil {
363+
Logger.Errorf("Failed to fetch disk cache metrics: %v", err)
364+
continue
365+
}
366+
367+
availableMemoryMb, totalMemoryMb := getMemoryMb()
368+
usedMemoryMb := totalMemoryMb - availableMemoryMb
369+
cas.metrics.MemCacheUsageMB.Update(float64(usedMemoryMb))
370+
cas.metrics.MemCacheUsagePct.Update(float64(usedMemoryMb) / float64(totalMemoryMb) * 100)
371+
cas.metrics.DiskCacheUsageMB.Update(float64(currentUsage))
372+
cas.metrics.DiskCacheUsagePct.Update(float64(usagePercentage))
373+
374+
Logger.Infof("Memory Cache Usage: %dMB / %dMB (%.2f%%)", availableMemoryMb, totalMemoryMb, float64(availableMemoryMb)/float64(totalMemoryMb)*100)
375+
Logger.Infof("Disk Cache Usage: %dMB / %dMB (%.2f%%)", currentUsage, totalDiskSpace, usagePercentage)
376+
377+
// Update internal state for disk usage exceeded
378+
cas.mu.Lock()
379+
cas.diskCachedUsageExceeded = usagePercentage > cas.serverConfig.DiskCacheMaxUsagePct
380+
cas.mu.Unlock()
381+
}
382+
}
383+
}

pkg/types.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,17 @@ const (
1919
)
2020

2121
type BlobCacheConfig struct {
22-
Server BlobCacheServerConfig `key:"server" json:"server"`
23-
Client BlobCacheClientConfig `key:"client" json:"client"`
24-
Global BlobCacheGlobalConfig `key:"global" json:"global"`
22+
Server BlobCacheServerConfig `key:"server" json:"server"`
23+
Client BlobCacheClientConfig `key:"client" json:"client"`
24+
Global BlobCacheGlobalConfig `key:"global" json:"global"`
25+
Metrics BlobCacheMetricsConfig `key:"metrics" json:"metrics"`
26+
}
27+
28+
type BlobCacheMetricsConfig struct {
29+
PushIntervalS int `key:"pushIntervalS" json:"push_interval_s"`
30+
URL string `key:"url" json:"url"`
31+
Username string `key:"username" json:"username"`
32+
Password string `key:"password" json:"password"`
2533
}
2634

2735
type BlobCacheGlobalConfig struct {

0 commit comments

Comments
 (0)