Skip to content

Commit 8e4bd91

Browse files
committed
metrics
1 parent 43b2b70 commit 8e4bd91

File tree

5 files changed

+231
-14
lines changed

5 files changed

+231
-14
lines changed

handler/health.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/gofiber/fiber/v2"
88
"github.com/minio/minio-go/v7"
9+
"github.com/mstgnz/cdn/pkg/observability"
910
"github.com/mstgnz/cdn/service"
1011
)
1112

@@ -38,9 +39,9 @@ func (h *HealthChecker) HealthCheck(c *fiber.Ctx) error {
3839
c.Status(fiber.StatusServiceUnavailable)
3940
}
4041

41-
status := map[string]interface{}{
42+
status := map[string]any{
4243
"status": overallStatus,
43-
"services": map[string]interface{}{
44+
"services": map[string]any{
4445
"minio": minioHealth,
4546
"aws": awsHealth,
4647
"cache": cacheHealth,
@@ -52,32 +53,60 @@ func (h *HealthChecker) HealthCheck(c *fiber.Ctx) error {
5253
}
5354

5455
func (h *HealthChecker) checkMinioHealth(ctx context.Context) string {
56+
start := time.Now()
57+
defer func() {
58+
duration := time.Since(start).Seconds()
59+
observability.ServiceHealthCheckDuration.WithLabelValues("minio").Observe(duration)
60+
observability.LastHealthCheckTimestamp.WithLabelValues("minio").Set(float64(time.Now().Unix()))
61+
}()
62+
5563
if _, err := h.minioClient.ListBuckets(ctx); err != nil {
64+
observability.ServiceHealth.WithLabelValues("minio").Set(0)
5665
return "unhealthy: " + err.Error()
5766
}
67+
observability.ServiceHealth.WithLabelValues("minio").Set(1)
5868
return "healthy"
5969
}
6070

6171
func (h *HealthChecker) checkAwsHealth(ctx context.Context) string {
72+
start := time.Now()
73+
defer func() {
74+
duration := time.Since(start).Seconds()
75+
observability.ServiceHealthCheckDuration.WithLabelValues("aws").Observe(duration)
76+
observability.LastHealthCheckTimestamp.WithLabelValues("aws").Set(float64(time.Now().Unix()))
77+
}()
78+
6279
if _, err := h.awsService.ListBuckets(); err != nil {
80+
observability.ServiceHealth.WithLabelValues("aws").Set(0)
6381
return "unhealthy: " + err.Error()
6482
}
83+
observability.ServiceHealth.WithLabelValues("aws").Set(1)
6584
return "healthy"
6685
}
6786

6887
func (h *HealthChecker) checkCacheHealth(ctx context.Context) string {
88+
start := time.Now()
89+
defer func() {
90+
duration := time.Since(start).Seconds()
91+
observability.ServiceHealthCheckDuration.WithLabelValues("cache").Observe(duration)
92+
observability.LastHealthCheckTimestamp.WithLabelValues("cache").Set(float64(time.Now().Unix()))
93+
}()
94+
6995
testKey := "health:test"
7096
testValue := []byte("test")
7197

7298
// Try to set a test value
7399
if err := h.cache.Set(testKey, testValue, time.Second); err != nil {
100+
observability.ServiceHealth.WithLabelValues("cache").Set(0)
74101
return "unhealthy: set failed - " + err.Error()
75102
}
76103

77104
// Try to get the test value
78105
if _, err := h.cache.Get(testKey); err != nil {
106+
observability.ServiceHealth.WithLabelValues("cache").Set(0)
79107
return "unhealthy: get failed - " + err.Error()
80108
}
81109

110+
observability.ServiceHealth.WithLabelValues("cache").Set(1)
82111
return "healthy"
83112
}

pkg/batch/processor.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
// BatchItem represents a single item in a batch
1313
type BatchItem struct {
1414
ID string
15-
Data interface{}
15+
Data any
1616
Error error
1717
Success bool
1818
}
@@ -132,26 +132,35 @@ func (b *BatchProcessor) processBatchWithRetry(items []BatchItem) {
132132

133133
var processed []BatchItem
134134
retries := 0
135+
start := time.Now()
135136

136137
for retries <= b.config.MaxRetries {
137-
start := time.Now()
138138
processed = b.processor(items)
139139
duration := time.Since(start).Seconds()
140140

141-
observability.StorageOperationDuration.WithLabelValues("batch_process", "bulk").Observe(duration)
142-
143141
failed := 0
142+
success := 0
144143
for _, item := range processed {
145-
if !item.Success {
144+
if item.Success {
145+
success++
146+
} else {
146147
failed++
147148
}
148149
}
149150

151+
// Update metrics
152+
observability.BatchProcessingDuration.WithLabelValues("success").Observe(duration)
153+
observability.BatchItemsProcessed.WithLabelValues("success").Add(float64(success))
154+
observability.BatchItemsProcessed.WithLabelValues("failed").Add(float64(failed))
155+
observability.BatchProcessorQueueSize.Set(float64(len(b.items)))
156+
150157
if failed == 0 {
151158
break
152159
}
153160

154161
retries++
162+
observability.BatchRetries.Inc()
163+
155164
if retries <= b.config.MaxRetries {
156165
b.logger.Warn().
157166
Int("retry", retries).

pkg/observability/metrics.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,130 @@ var (
4545
},
4646
[]string{"operation", "provider"},
4747
)
48+
49+
// Health Check Metrics
50+
ServiceHealth = promauto.NewGaugeVec(
51+
prometheus.GaugeOpts{
52+
Name: "service_health_status",
53+
Help: "Current health status of services (1 for healthy, 0 for unhealthy)",
54+
},
55+
[]string{"service"},
56+
)
57+
58+
ServiceHealthCheckDuration = promauto.NewHistogramVec(
59+
prometheus.HistogramOpts{
60+
Name: "service_health_check_duration_seconds",
61+
Help: "Duration of health checks in seconds",
62+
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
63+
},
64+
[]string{"service"},
65+
)
66+
67+
LastHealthCheckTimestamp = promauto.NewGaugeVec(
68+
prometheus.GaugeOpts{
69+
Name: "service_last_health_check_timestamp",
70+
Help: "Timestamp of the last health check",
71+
},
72+
[]string{"service"},
73+
)
74+
75+
// Worker Pool Metrics
76+
WorkerPoolQueueSize = promauto.NewGauge(
77+
prometheus.GaugeOpts{
78+
Name: "worker_pool_queue_size",
79+
Help: "Current number of jobs in the worker pool queue",
80+
},
81+
)
82+
83+
WorkerPoolActiveWorkers = promauto.NewGauge(
84+
prometheus.GaugeOpts{
85+
Name: "worker_pool_active_workers",
86+
Help: "Current number of active workers in the pool",
87+
},
88+
)
89+
90+
WorkerJobProcessingDuration = promauto.NewHistogramVec(
91+
prometheus.HistogramOpts{
92+
Name: "worker_job_processing_duration_seconds",
93+
Help: "Duration of job processing in seconds",
94+
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
95+
},
96+
[]string{"status"},
97+
)
98+
99+
WorkerJobRetries = promauto.NewCounterVec(
100+
prometheus.CounterOpts{
101+
Name: "worker_job_retries_total",
102+
Help: "Total number of job retries",
103+
},
104+
[]string{"job_type"},
105+
)
106+
107+
// Batch Processor Metrics
108+
BatchProcessorQueueSize = promauto.NewGauge(
109+
prometheus.GaugeOpts{
110+
Name: "batch_processor_queue_size",
111+
Help: "Current number of items in the batch processor queue",
112+
},
113+
)
114+
115+
BatchProcessingDuration = promauto.NewHistogramVec(
116+
prometheus.HistogramOpts{
117+
Name: "batch_processing_duration_seconds",
118+
Help: "Duration of batch processing in seconds",
119+
Buckets: []float64{.01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
120+
},
121+
[]string{"status"},
122+
)
123+
124+
BatchItemsProcessed = promauto.NewCounterVec(
125+
prometheus.CounterOpts{
126+
Name: "batch_items_processed_total",
127+
Help: "Total number of items processed by the batch processor",
128+
},
129+
[]string{"status"},
130+
)
131+
132+
BatchRetries = promauto.NewCounter(
133+
prometheus.CounterOpts{
134+
Name: "batch_retries_total",
135+
Help: "Total number of batch retries",
136+
},
137+
)
138+
139+
// Cache Metrics
140+
CacheOperations = promauto.NewCounterVec(
141+
prometheus.CounterOpts{
142+
Name: "cache_operations_total",
143+
Help: "Total number of cache operations",
144+
},
145+
[]string{"operation", "status"},
146+
)
147+
148+
CacheHitRatio = promauto.NewGaugeVec(
149+
prometheus.GaugeOpts{
150+
Name: "cache_hit_ratio",
151+
Help: "Cache hit ratio for different operations",
152+
},
153+
[]string{"operation"},
154+
)
155+
156+
CacheOperationDuration = promauto.NewHistogramVec(
157+
prometheus.HistogramOpts{
158+
Name: "cache_operation_duration_seconds",
159+
Help: "Duration of cache operations in seconds",
160+
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5},
161+
},
162+
[]string{"operation", "status"},
163+
)
164+
165+
CacheSize = promauto.NewGaugeVec(
166+
prometheus.GaugeOpts{
167+
Name: "cache_size_bytes",
168+
Help: "Size of cached data in bytes",
169+
},
170+
[]string{"type"},
171+
)
48172
)
49173

50174
// MetricsHandler HTTP handler for Prometheus metrics

pkg/worker/pool.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package worker
33
import (
44
"context"
55
"fmt"
6-
"strconv"
76
"sync"
87
"time"
98

@@ -115,22 +114,25 @@ func (p *Pool) worker(id int) {
115114
return
116115
}
117116

117+
observability.WorkerPoolActiveWorkers.Inc()
118+
defer observability.WorkerPoolActiveWorkers.Dec()
119+
118120
var err error
119121
retries := 0
122+
start := time.Now()
120123

121124
for retries <= p.maxRetries {
122-
start := time.Now()
123125
err = job.Task()
124126
duration := time.Since(start).Seconds()
125127

126-
// Record metrics
127-
observability.ImageProcessingDuration.WithLabelValues("worker_" + strconv.Itoa(id)).Observe(duration)
128-
129128
if err == nil {
129+
observability.WorkerJobProcessingDuration.WithLabelValues("success").Observe(duration)
130130
break
131131
}
132132

133133
retries++
134+
observability.WorkerJobRetries.WithLabelValues("image_processing").Inc()
135+
134136
p.logger.Error().
135137
Err(err).
136138
Str("jobID", job.ID).
@@ -142,10 +144,16 @@ func (p *Pool) worker(id int) {
142144
time.Sleep(p.retryDelay)
143145
continue
144146
}
147+
148+
observability.WorkerJobProcessingDuration.WithLabelValues("failure").Observe(duration)
145149
}
146150

147151
job.Response <- err
148152

153+
// Update queue size metric
154+
queueSize := float64(len(p.jobQueue))
155+
observability.WorkerPoolQueueSize.Set(queueSize)
156+
149157
case <-p.ctx.Done():
150158
return
151159
}

0 commit comments

Comments
 (0)