diff --git a/src/go.mod b/src/go.mod index b0ad159..4bfcea8 100644 --- a/src/go.mod +++ b/src/go.mod @@ -5,6 +5,7 @@ go 1.24.3 require ( github.com/docker/docker v28.2.2+incompatible github.com/redis/go-redis/v9 v9.10.0 + github.com/stretchr/testify v1.10.0 ) require ( @@ -12,6 +13,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.5.0 // indirect @@ -24,10 +26,12 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect go.opentelemetry.io/otel v1.36.0 // indirect go.opentelemetry.io/otel/metric v1.36.0 // indirect go.opentelemetry.io/otel/trace v1.36.0 // indirect golang.org/x/sys v0.33.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/src/go.sum b/src/go.sum index f400861..ebe6cf3 100644 --- a/src/go.sum +++ b/src/go.sum @@ -6,6 +6,7 @@ github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= @@ -38,12 +39,15 @@ github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgr github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -85,3 +89,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/int_test.go b/src/int_test.go index d796dde..b211651 100644 --- a/src/int_test.go +++ b/src/int_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "sync" "os/signal" "syscall" "testing" @@ -34,7 +35,10 @@ func TestIntegration(t *testing.T) { signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // test jobs + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() jobTypes := []string{"a", "b", "c"} for i := 0; i < 10; i++ { jobType := jobTypes[i%len(jobTypes)] @@ -48,5 +52,7 @@ func TestIntegration(t *testing.T) { } } }() + + wg.Wait() supervisor.Stop() } diff --git a/src/scheduler.go b/src/scheduler.go index 7123f37..30642a7 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log" - "time" "github.com/redis/go-redis/v9" ) @@ -27,13 +26,7 @@ func NewScheduler(redisAddr string) *Scheduler { } func (s *Scheduler) Enqueue(jobType string, payload map[string]interface{}) error { - job := Job{ - ID: generateJobID(), - Type: jobType, - Payload: payload, - Retries: 0, - Created: time.Now(), - } + job := NewJob(jobType, payload, "") // TODO account for gpu type jobData, err := json.Marshal(job) if err != nil { diff --git a/src/supervisor_test.go b/src/supervisor_test.go new file mode 100644 index 0000000..fae1e6c --- /dev/null +++ b/src/supervisor_test.go @@ -0,0 +1,23 @@ +package main + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCanHandleJob(t *testing.T) { + redisAddr := "localhost:6379" + consumerID := fmt.Sprintf("worker_%d", os.Getpid()) + supervisor := NewSupervisor(redisAddr, consumerID, "AMD") + + jobAMD := NewJob("", nil, "AMD") + jobNVIDIA := NewJob("", nil, "NVIDIA") + jobAny := NewJob("", nil, "") + + assert.True(t, supervisor.canHandleJob(jobAMD)) + assert.True(t, supervisor.canHandleJob(jobAny)) + assert.False(t, supervisor.canHandleJob(jobNVIDIA)) +} diff --git a/src/util.go b/src/util.go index 9a2e05c..15a107d 100644 --- a/src/util.go +++ b/src/util.go @@ -22,6 +22,17 @@ type Job struct { RequiredGPU string `json:"gpu"` } +func NewJob(jobType string, payload map[string]interface{}, requiredGPU string) Job { + return Job{ + ID: generateJobID(), + Type: jobType, + Payload: payload, + Retries: 0, + RequiredGPU: requiredGPU, + Created: time.Now(), + } +} + func generateJobID() string { return fmt.Sprintf("job_%d_%d", time.Now().UnixNano(), os.Getpid()) }