Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,33 @@ services:
- redis_data:/data
- ./redis.conf:/usr/local/etc/redis/redis.conf:ro
command: redis-server /usr/local/etc/redis/redis.conf
prometheus:
image: prom/prometheus:latest
ports: ["9090:9090"]
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prom-data:/prometheus
command:
- --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.path=/prometheus

grafana:
image: grafana/grafana:latest
ports: ["3001:3000"]
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
depends_on: [prometheus]


volumes:
redis_data:
driver: local
prom-data:
driver: local
grafana_data:
driver: local
9 changes: 9 additions & 0 deletions prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
global:
scrape_interval: 5s
evaluation_interval: 5s

scrape_configs:
- job_name: "app"
metrics_path: /metrics
static_configs:
- targets: ["host.docker.internal:3000"]
52 changes: 30 additions & 22 deletions src/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,47 @@ import (
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"
)

type App struct {
redisClient *redis.Client
scheduler *Scheduler
supervisor *Supervisor
httpServer *http.Server
wg sync.WaitGroup
log *slog.Logger
redisClient *redis.Client
scheduler *Scheduler
supervisor *Supervisor
httpServer *http.Server
wg sync.WaitGroup
log *slog.Logger
metrics *Metrics
statusRegistry *StatusRegistry
}

func NewApp(redisAddr, gpuType string, log *slog.Logger) *App {
func NewApp(redisAddr, gpuType string, log *slog.Logger, metrics *Metrics) *App {
client := redis.NewClient(&redis.Options{Addr: redisAddr})
scheduler := NewScheduler(redisAddr, log)
statusRegistry := NewStatusRegistry(client, log)

consumerID := fmt.Sprintf("worker_%d", os.Getpid())
supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log)
supervisor := NewSupervisor(redisAddr, consumerID, gpuType, log, metrics)

mux := http.NewServeMux()
a := &App{
redisClient: client,
scheduler: scheduler,
supervisor: supervisor,
httpServer: &http.Server{Addr: ":3000", Handler: mux},
log: log,
statusRegistry: statusRegistry,
redisClient: client,
scheduler: scheduler,
supervisor: supervisor,
httpServer: &http.Server{Addr: ":3000", Handler: mux},
log: log,
metrics: metrics,
statusRegistry: statusRegistry,
}

mux.HandleFunc("/auth/login", a.login)
mux.HandleFunc("/auth/refresh", a.refresh)
mux.HandleFunc("/jobs", a.enqueueJob)
mux.HandleFunc("/jobs/status", a.getJobStatus)
mux.HandleFunc("/supervisors/status", a.getSupervisorStatus)
mux.HandleFunc("/supervisors/status/", a.getSupervisorStatusByID)
mux.HandleFunc("/supervisors", a.getAllSupervisors)
mux.Handle("/auth/login", a.metrics.WrapHTTP("auth_login", http.HandlerFunc(a.login)))
mux.Handle("/auth/refresh", a.metrics.WrapHTTP("auth_refresh", http.HandlerFunc(a.refresh)))
mux.Handle("/jobs", a.metrics.WrapHTTP("jobs", http.HandlerFunc(a.enqueueJob)))
mux.Handle("/jobs/status", a.metrics.WrapHTTP("jobs_status", http.HandlerFunc(a.getJobStatus)))
mux.Handle("/supervisors/status", a.metrics.WrapHTTP("supervisors_status", http.HandlerFunc(a.getSupervisorStatus)))
mux.Handle("/supervisors/status/", a.metrics.WrapHTTP("supervisors_status_by_id", http.HandlerFunc(a.getSupervisorStatusByID)))
mux.Handle("/supervisors", a.metrics.WrapHTTP("supervisors", http.HandlerFunc(a.getAllSupervisors)))

a.log.Info("new app initialized", "redis_address", redisAddr,
"gpu_type", gpuType, "http_address", a.httpServer.Addr)
Expand Down Expand Up @@ -119,7 +122,9 @@ func main() {
fmt.Fprintf(os.Stderr, "failed to create logger: %v\n", err)
os.Exit(1)
}
app := NewApp("localhost:6379", "AMD", log)

metrics := NewMetrics()
app := NewApp("localhost:6379", "AMD", log, metrics)

if err := app.Start(); err != nil {
log.Error("failed to start app", "err", err)
Expand All @@ -128,6 +133,9 @@ func main() {

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

go app.metrics.StartCollecting(ctx)

<-ctx.Done()
log.Info("shutdown signal received")

Expand Down
17 changes: 17 additions & 0 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,24 @@ go 1.24.3
require github.com/redis/go-redis/v9 v9.10.0

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/sys v0.35.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
40 changes: 40 additions & 0 deletions src/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
Expand All @@ -6,8 +8,46 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs=
github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions src/int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func TestIntegration(t *testing.T) {
fmt.Fprintf(os.Stderr, "failed to create logger: %v\n", err)
os.Exit(1)
}

metrics := NewMetrics()
consumerID := fmt.Sprintf("worker_%d", os.Getpid())
supervisor := NewSupervisor(redisAddr, consumerID, "AMD", supervisorLog)
supervisor := NewSupervisor(redisAddr, consumerID, "AMD", supervisorLog, metrics)

if err := supervisor.Start(); err != nil {
t.Errorf("Failed to start supervisor: %v", err)
Expand Down
Loading