From 88233e5c97842184cb29cad3bc92255ec640d62c Mon Sep 17 00:00:00 2001 From: king_stroke Date: Mon, 22 Sep 2025 20:46:24 -0400 Subject: [PATCH 1/3] Added prometheus integration and monitoring --- src/go.mod | 17 ++++++ src/prometheus.go | 140 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 src/prometheus.go diff --git a/src/go.mod b/src/go.mod index 3e8eb50..0faab9a 100644 --- a/src/go.mod +++ b/src/go.mod @@ -5,7 +5,24 @@ go 1.24.3 require github.com/redis/go-redis/v9 v9.10.0 require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/prometheus/client_golang v1.23.2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + github.com/shirou/gopsutil/v3 v3.24.5 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/src/prometheus.go b/src/prometheus.go new file mode 100644 index 0000000..1e6b0f0 --- /dev/null +++ b/src/prometheus.go @@ -0,0 +1,140 @@ +package main + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + cpuinfo "github.com/shirou/gopsutil/v3/cpu" + diskinfo "github.com/shirou/gopsutil/v3/disk" + meminfo "github.com/shirou/gopsutil/v3/mem" +) + +func handle() { + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(":2112", nil) +} + +// http metrics +var ( + httpInFlight = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "http_in_flight_requests", + Help: "Current number of in-flight HTTP requests.", + }) + httpRequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "http_requests_total", + Help: "Total HTTP requests by handler/method/status.", + }, []string{"handler", "method", "code"}) + httpRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "HTTP request latency in seconds.", + Buckets: prometheus.DefBuckets, + }, []string{"handler", "method", "code"}) +) + +// job metrics +var ( + jobsStarted = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobs_started_total", + Help: "Total number of jobs started.", + }, []string{"job_type", "gpu"}) + jobsCompleted = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobs_completed_total", + Help: "Total number of jobs completed successfully.", + }, []string{"job_type", "gpu"}) + jobsFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "jobs_failed_total", + Help: "Total number of jobs that failed.", + }, []string{"job_type", "gpu"}) + runningJobs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "running_jobs", + Help: "Total jobs currently running", + }, []string{"gpu"}) +) + +// system metrics (cpu, memory, disk, etc.) +var ( + // cpu (percent of total) + systemCPUPercent = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "system_cpu_usage_percent", + Help: "Host CPU usage percentage (all cores averaged).", + }) + + // memory + systemMemTotalBytes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "system_memory_total_bytes", + Help: "Host total memory bytes.", + }) + systemMemUsedBytes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "system_memory_used_bytes", + Help: "Host used memory bytes.", + }) + + // disk per mountpoint + systemDiskTotalBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "system_disk_total_bytes", + Help: "Total disk bytes for a mountpoint.", + }, []string{"mountpoint"}) + + systemDiskUsedBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "system_disk_used_bytes", + Help: "Used disk bytes for a mountpoint.", + }, []string{"mountpoint"}) +) + +// this function may need to run per server to capture local system metrics + +func startSystemCollector(ctx context.Context) { + + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + + select { + case <-ctx.Done(): + return + + case <-ticker.C: + + // cpu + pct, err := cpuinfo.Percent(0, false) + + if err == nil && len(pct) > 0 { + systemCPUPercent.Set(pct[0]) + } else { + // TODO: log err + } + + // memory + m, err := meminfo.VirtualMemory() + if err == nil { + systemMemTotalBytes.Set(float64(m.Total)) + systemMemUsedBytes.Set(float64(m.Used)) + } else { + // TODO: log err + } + + // disk capture + parts, err := diskinfo.Partitions(false) + if err == nil { + for _, p := range parts { + if u, err := diskinfo.Usage(p.Mountpoint); err == nil { + // Use a consistent label key (e.g., mountpoint) + systemDiskTotalBytes.WithLabelValues(u.Path).Set(float64(u.Total)) + systemDiskUsedBytes.WithLabelValues(u.Path).Set(float64(u.Used)) + } else { + // TODO: log err + } + } + } else { + // TODO: log err + } + } + } + }() +} From e8ee41152b03832598ab6f3e0ae08b7c3bde0c14 Mon Sep 17 00:00:00 2001 From: king_stroke Date: Mon, 22 Sep 2025 20:48:53 -0400 Subject: [PATCH 2/3] Added prometheuss integration and monitoring --- src/go.sum | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/go.sum b/src/go.sum index 30e2f56..51e361f 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1,3 +1,5 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -6,8 +8,46 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= +github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 4c87de22082757bbbe716632718a56c0918812b2 Mon Sep 17 00:00:00 2001 From: avii778 <142438557+avii778@users.noreply.github.com> Date: Mon, 29 Sep 2025 13:28:19 -0400 Subject: [PATCH 3/3] Fixed issues and added prometheus and grafana setup --- docker-compose.yml | 25 +++++++++++++ prometheus.yml | 9 +++++ src/api.go | 24 +++++++++---- src/int_test.go | 4 +-- src/prometheus.go | 90 +++++++++++++++++++++++++++++++++++++++------- src/supervisor.go | 14 ++++++-- 6 files changed, 142 insertions(+), 24 deletions(-) create mode 100644 prometheus.yml diff --git a/docker-compose.yml b/docker-compose.yml index b400a10..a5445b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,8 +8,33 @@ services: - redis_data:/data - ./redis.conf:/usr/local/etc/redis/redis.conf:ro command: redis-server /usr/local/etc/redis/redis.conf + prometheus: + image: prom/prometheus:latest + ports: ["9090:9090"] + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prom-data:/prometheus + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.path=/prometheus + + grafana: + image: grafana/grafana:latest + ports: ["3001:3000"] + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + volumes: + - grafana_data:/var/lib/grafana + depends_on: [prometheus] volumes: redis_data: + driver: local + prom-data: + driver: local + grafana_data: driver: local \ No newline at end of file diff --git a/prometheus.yml b/prometheus.yml new file mode 100644 index 0000000..28ed0f7 --- /dev/null +++ b/prometheus.yml @@ -0,0 +1,9 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: "app" + metrics_path: /metrics + static_configs: + - targets: ["host.docker.internal:3000"] diff --git a/src/api.go b/src/api.go index d44428d..bc5b61f 100644 --- a/src/api.go +++ b/src/api.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/redis/go-redis/v9" ) @@ -22,14 +23,15 @@ type App struct { httpServer *http.Server wg sync.WaitGroup log *slog.Logger + metrics *Metrics } -func NewApp(redisAddr, gpuType string, log *slog.Logger) *App { +func NewApp(redisAddr, gpuType string, log *slog.Logger, metrics *Metrics) *App { client := redis.NewClient(&redis.Options{Addr: redisAddr}) scheduler := NewScheduler(redisAddr, log) consumerID := fmt.Sprintf("worker_%d", os.Getpid()) - supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log) + supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log, metrics) mux := http.NewServeMux() a := &App{ @@ -38,12 +40,15 @@ func NewApp(redisAddr, gpuType string, log *slog.Logger) *App { supervisor: supervisor, httpServer: &http.Server{Addr: ":3000", Handler: mux}, log: log, + metrics: metrics, } - mux.HandleFunc("/auth/login", a.login) - mux.HandleFunc("/auth/refresh", a.refresh) - mux.HandleFunc("/jobs", a.enqueueJob) - mux.HandleFunc("/jobs/status", a.getJobStatus) + mux.Handle("/auth/login", a.metrics.WrapHTTP("auth_login", http.HandlerFunc(a.login))) + mux.Handle("/auth/refresh", a.metrics.WrapHTTP("auth_refresh", http.HandlerFunc(a.refresh))) + mux.Handle("/jobs", a.metrics.WrapHTTP("jobs", http.HandlerFunc(a.enqueueJob))) + mux.Handle("/jobs/status", a.metrics.WrapHTTP("jobs_status", http.HandlerFunc(a.getJobStatus))) + + mux.Handle("/metrics", promhttp.Handler()) a.log.Info("new app initialized", "redis_address", redisAddr, "gpu_type", gpuType, "http_address", a.httpServer.Addr) @@ -105,7 +110,9 @@ func main() { fmt.Fprintf(os.Stderr, "failed to create logger: %v\n", err) os.Exit(1) } - app := NewApp("localhost:6379", "AMD", log) + + metrics := NewMetrics() + app := NewApp("localhost:6379", "AMD", log, metrics) if err := app.Start(); err != nil { log.Error("failed to start app", "err", err) @@ -114,6 +121,9 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() + + go app.metrics.StartCollecting(ctx) + <-ctx.Done() log.Info("shutdown signal received") diff --git a/src/int_test.go b/src/int_test.go index dac0636..1d30ec8 100644 --- a/src/int_test.go +++ b/src/int_test.go @@ -33,9 +33,9 @@ func TestIntegration(t *testing.T) { fmt.Fprintf(os.Stderr, "failed to create logger: %v\n", err) os.Exit(1) } - + metrics := NewMetrics() consumerID := fmt.Sprintf("worker_%d", os.Getpid()) - supervisor := NewSupervisor(redisAddr, consumerID, "AMD", supervisorLog) + supervisor := NewSupervisor(redisAddr, consumerID, "AMD", supervisorLog, metrics) if err := supervisor.Start(); err != nil { t.Errorf("Failed to start supervisor: %v", err) diff --git a/src/prometheus.go b/src/prometheus.go index 1e6b0f0..a5cffc7 100644 --- a/src/prometheus.go +++ b/src/prometheus.go @@ -2,6 +2,7 @@ package main import ( "context" + "log/slog" "net/http" "time" @@ -13,11 +14,6 @@ import ( meminfo "github.com/shirou/gopsutil/v3/mem" ) -func handle() { - http.Handle("/metrics", promhttp.Handler()) - http.ListenAndServe(":2112", nil) -} - // http metrics var ( httpInFlight = promauto.NewGauge(prometheus.GaugeOpts{ @@ -49,7 +45,7 @@ var ( Name: "jobs_failed_total", Help: "Total number of jobs that failed.", }, []string{"job_type", "gpu"}) - runningJobs = promauto.NewCounterVec(prometheus.CounterOpts{ + runningJobs = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "running_jobs", Help: "Total jobs currently running", }, []string{"gpu"}) @@ -85,8 +81,78 @@ var ( }, []string{"mountpoint"}) ) -// this function may need to run per server to capture local system metrics +type Metrics struct { + HTTPInFlight prometheus.Gauge + HTTPRequestsTotal *prometheus.CounterVec + HTTPRequestDuration *prometheus.HistogramVec + + JobsStarted *prometheus.CounterVec + JobsCompleted *prometheus.CounterVec + JobsFailed *prometheus.CounterVec + RunningJobs *prometheus.GaugeVec + + SystemCPUPercent prometheus.Gauge + SystemMemTotalBytes prometheus.Gauge + SystemMemUsedBytes prometheus.Gauge + SystemDiskTotalBytes *prometheus.GaugeVec + SystemDiskUsedBytes *prometheus.GaugeVec +} + +func NewMetrics() *Metrics { + return &Metrics{ + HTTPInFlight: httpInFlight, + HTTPRequestsTotal: httpRequestsTotal, + HTTPRequestDuration: httpRequestDuration, + + JobsStarted: jobsStarted, + JobsCompleted: jobsCompleted, + JobsFailed: jobsFailed, + RunningJobs: runningJobs, + + SystemCPUPercent: systemCPUPercent, + SystemMemTotalBytes: systemMemTotalBytes, + SystemMemUsedBytes: systemMemUsedBytes, + SystemDiskTotalBytes: systemDiskTotalBytes, + SystemDiskUsedBytes: systemDiskUsedBytes, + } +} + +func (m *Metrics) StartCollecting(ctx context.Context) { + startSystemCollector(ctx) + +} + +// Wrap http wraps around the http handlers to collect metrics +func (m *Metrics) WrapHTTP(name string, next http.Handler) http.Handler { + return promhttp.InstrumentHandlerInFlight( + httpInFlight, + promhttp.InstrumentHandlerDuration( + httpRequestDuration.MustCurryWith(prometheus.Labels{"handler": name}), + promhttp.InstrumentHandlerCounter( + httpRequestsTotal.MustCurryWith(prometheus.Labels{"handler": name}), + next, + ), + ), + ) +} + +func (m *Metrics) TrackJob(ctx context.Context, jobType, gpu string, fn func(context.Context) error) error { + + jobsStarted.WithLabelValues(jobType, gpu).Inc() + runningJobs.WithLabelValues(gpu).Inc() + defer runningJobs.WithLabelValues(gpu).Dec() + err := fn(ctx) + + if err != nil { + jobsFailed.WithLabelValues(jobType, gpu).Inc() + return err + } + jobsCompleted.WithLabelValues(jobType, gpu).Inc() + return nil +} + +// Collects metrics from the host where this process is running, these values reflect the local machine func startSystemCollector(ctx context.Context) { go func() { @@ -107,7 +173,7 @@ func startSystemCollector(ctx context.Context) { if err == nil && len(pct) > 0 { systemCPUPercent.Set(pct[0]) } else { - // TODO: log err + slog.Error("failed to get cpu percent", "err", err) } // memory @@ -116,7 +182,7 @@ func startSystemCollector(ctx context.Context) { systemMemTotalBytes.Set(float64(m.Total)) systemMemUsedBytes.Set(float64(m.Used)) } else { - // TODO: log err + slog.Error("failed to get memory info", "err", err) } // disk capture @@ -124,15 +190,15 @@ func startSystemCollector(ctx context.Context) { if err == nil { for _, p := range parts { if u, err := diskinfo.Usage(p.Mountpoint); err == nil { - // Use a consistent label key (e.g., mountpoint) + systemDiskTotalBytes.WithLabelValues(u.Path).Set(float64(u.Total)) systemDiskUsedBytes.WithLabelValues(u.Path).Set(float64(u.Used)) } else { - // TODO: log err + slog.Error("failed to get disk usage", "mountpoint", p.Mountpoint) } } } else { - // TODO: log err + slog.Error("failed to get disk partitions") } } } diff --git a/src/supervisor.go b/src/supervisor.go index 5c5756b..c1e49ed 100644 --- a/src/supervisor.go +++ b/src/supervisor.go @@ -20,9 +20,10 @@ type Supervisor struct { gpuType string wg sync.WaitGroup log *slog.Logger + metrics *Metrics } -func NewSupervisor(redisAddr, consumerID, gpuType string, log *slog.Logger) *Supervisor { +func NewSupervisor(redisAddr, consumerID, gpuType string, log *slog.Logger, metrics *Metrics) *Supervisor { client := redis.NewClient(&redis.Options{ Addr: redisAddr, }) @@ -36,6 +37,7 @@ func NewSupervisor(redisAddr, consumerID, gpuType string, log *slog.Logger) *Sup consumerID: consumerID, gpuType: gpuType, log: log, + metrics: metrics, } } @@ -126,9 +128,15 @@ func (s *Supervisor) handleMessage(message redis.XMessage) { s.log.Info("processing job", "job_id", job.ID, "job_type", job.Type) // Simulate job processing - success := s.processJob(job) + gpuLabel := s.gpuType // e.g. "AMD" or "NVIDIA" + err := s.metrics.TrackJob(context.Background(), job.Type, gpuLabel, func(ctx context.Context) error { + if s.processJob(job) { + return nil + } + return fmt.Errorf("job failed") + }) - if success { + if err == nil { s.ackMessage(message.ID) s.log.Info("job completed successfully", "job_id", job.ID) } else {