Skip to content

Commit

Permalink
fix(address): support clustered usage (#40)
Browse files Browse the repository at this point in the history
* feat: refactor `WithAddr` function to accept multiple addresses

- Add `github.com/nats-io/nats.go` as an import
- Change the `WithAddr` function signature to accept multiple addresses
- Modify the `WithAddr` function to join multiple addresses with commas
- Change the default address value to `nats.DefaultURL`

Signed-off-by: Bo-Yi Wu <[email protected]>
  • Loading branch information
appleboy authored Dec 5, 2023
1 parent 0ddabe8 commit f2d7441
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ jobs:

# Service containers to run with `container-job`
services:
nats:
nats01:
image: nats
ports:
- 4222:4222
nats02:
image: nats
ports:
- 4223:4222

steps:
- name: Set up Go ${{ matrix.go }}
Expand Down
42 changes: 32 additions & 10 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
"github.com/nats-io/nats.go"

"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)

var host = "127.0.0.1"
var host = nats.DefaultURL

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
Expand All @@ -36,7 +37,28 @@ func TestDefaultFlow(t *testing.T) {
Message: "foo",
}
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("test"),
WithQueue("test"),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(1),
)
assert.NoError(t, err)
assert.NoError(t, q.Queue(m))
assert.NoError(t, q.Queue(m))
q.Start()
time.Sleep(500 * time.Millisecond)
q.Release()
}

func TestClusteredHost(t *testing.T) {
m := &mockMessage{
Message: "foo",
}
w := NewWorker(
WithAddr(host, "nats://localhost:4223"),
WithSubj("test"),
WithQueue("test"),
)
Expand All @@ -54,7 +76,7 @@ func TestDefaultFlow(t *testing.T) {

func TestShutdown(t *testing.T) {
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("test"),
WithQueue("test"),
)
Expand All @@ -76,7 +98,7 @@ func TestCustomFuncAndWait(t *testing.T) {
Message: "foo",
}
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("test"),
WithQueue("test"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
Expand Down Expand Up @@ -107,7 +129,7 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
Message: "foo",
}
w := NewWorker(
WithAddr(host + ":4222"),
WithAddr(host),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
Expand All @@ -129,7 +151,7 @@ func TestJobReachTimeout(t *testing.T) {
Message: "foo",
}
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("JobReachTimeout"),
WithQueue("test"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
Expand Down Expand Up @@ -169,7 +191,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
Message: "test",
}
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("CancelJob"),
WithQueue("test"),
WithLogger(queue.NewLogger()),
Expand Down Expand Up @@ -210,7 +232,7 @@ func TestGoroutineLeak(t *testing.T) {
Message: "foo",
}
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("GoroutineLeak"),
WithQueue("test"),
WithLogger(queue.NewEmptyLogger()),
Expand Down Expand Up @@ -256,7 +278,7 @@ func TestGoroutinePanic(t *testing.T) {
Message: "foo",
}
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("GoroutinePanic"),
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
panic("missing something")
Expand All @@ -282,7 +304,7 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
Payload: []byte("foo"),
}
w := NewWorker(
WithAddr(host+":4222"),
WithAddr(host),
WithSubj("test02"),
WithQueue("test02"),
)
Expand Down
11 changes: 8 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package nats

import (
"context"
"strings"

"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"

"github.com/nats-io/nats.go"
)

// Option for queue system
Expand All @@ -19,9 +22,11 @@ type options struct {
}

// WithAddr setup the addr of NATS
func WithAddr(addr string) Option {
func WithAddr(addrs ...string) Option {
return func(w *options) {
w.addr = "nats://" + addr
if len(addrs) > 0 {
w.addr = strings.Join(addrs, ",")
}
}
}

Expand Down Expand Up @@ -55,7 +60,7 @@ func WithLogger(l queue.Logger) Option {

func newOptions(opts ...Option) options {
defaultOpts := options{
addr: "127.0.0.1:4222",
addr: nats.DefaultURL,
subj: "foobar",
queue: "foobar",
logger: queue.NewLogger(),
Expand Down

0 comments on commit f2d7441

Please sign in to comment.