diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index c601e61..e7cda7f 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -33,10 +33,14 @@ jobs: # Service containers to run with `container-job` services: - nats: + nats01: image: nats ports: - 4222:4222 + nats02: + image: nats + ports: + - 4223:4222 steps: - name: Set up Go ${{ matrix.go }} diff --git a/nats_test.go b/nats_test.go index 269915d..4058c0a 100644 --- a/nats_test.go +++ b/nats_test.go @@ -12,12 +12,13 @@ import ( "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" "github.com/golang-queue/queue/job" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "go.uber.org/goleak" ) -var host = "127.0.0.1" +var host = nats.DefaultURL func TestMain(m *testing.M) { goleak.VerifyTestMain(m) @@ -36,7 +37,28 @@ func TestDefaultFlow(t *testing.T) { Message: "foo", } w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), + WithSubj("test"), + WithQueue("test"), + ) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(1), + ) + assert.NoError(t, err) + assert.NoError(t, q.Queue(m)) + assert.NoError(t, q.Queue(m)) + q.Start() + time.Sleep(500 * time.Millisecond) + q.Release() +} + +func TestClusteredHost(t *testing.T) { + m := &mockMessage{ + Message: "foo", + } + w := NewWorker( + WithAddr(host, "nats://localhost:4223"), WithSubj("test"), WithQueue("test"), ) @@ -54,7 +76,7 @@ func TestDefaultFlow(t *testing.T) { func TestShutdown(t *testing.T) { w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), WithSubj("test"), WithQueue("test"), ) @@ -76,7 +98,7 @@ func TestCustomFuncAndWait(t *testing.T) { Message: "foo", } w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), WithSubj("test"), WithQueue("test"), WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { @@ -107,7 +129,7 @@ func TestEnqueueJobAfterShutdown(t *testing.T) { Message: "foo", } w := NewWorker( - WithAddr(host + ":4222"), + WithAddr(host), ) q, err := queue.NewQueue( queue.WithWorker(w), @@ -129,7 +151,7 @@ func TestJobReachTimeout(t *testing.T) { Message: "foo", } w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), WithSubj("JobReachTimeout"), WithQueue("test"), WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { @@ -169,7 +191,7 @@ func TestCancelJobAfterShutdown(t *testing.T) { Message: "test", } w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), WithSubj("CancelJob"), WithQueue("test"), WithLogger(queue.NewLogger()), @@ -210,7 +232,7 @@ func TestGoroutineLeak(t *testing.T) { Message: "foo", } w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), WithSubj("GoroutineLeak"), WithQueue("test"), WithLogger(queue.NewEmptyLogger()), @@ -256,7 +278,7 @@ func TestGoroutinePanic(t *testing.T) { Message: "foo", } w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), WithSubj("GoroutinePanic"), WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { panic("missing something") @@ -282,7 +304,7 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) { Payload: []byte("foo"), } w := NewWorker( - WithAddr(host+":4222"), + WithAddr(host), WithSubj("test02"), WithQueue("test02"), ) diff --git a/options.go b/options.go index 779705e..dfc36af 100644 --- a/options.go +++ b/options.go @@ -2,9 +2,12 @@ package nats import ( "context" + "strings" "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" + + "github.com/nats-io/nats.go" ) // Option for queue system @@ -19,9 +22,11 @@ type options struct { } // WithAddr setup the addr of NATS -func WithAddr(addr string) Option { +func WithAddr(addrs ...string) Option { return func(w *options) { - w.addr = "nats://" + addr + if len(addrs) > 0 { + w.addr = strings.Join(addrs, ",") + } } } @@ -55,7 +60,7 @@ func WithLogger(l queue.Logger) Option { func newOptions(opts ...Option) options { defaultOpts := options{ - addr: "127.0.0.1:4222", + addr: nats.DefaultURL, subj: "foobar", queue: "foobar", logger: queue.NewLogger(),