diff --git a/jetstream.go b/jetstream.go index a25cca3..27ef0c0 100644 --- a/jetstream.go +++ b/jetstream.go @@ -13,9 +13,10 @@ type ( JetStream interface { Publish(subject string, value []byte, opts ...nats.PubOpt) (*nats.PubAck, error) QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) - Close() + Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) + GetNATSConnection() *nats.Conn } // jsImpl JetStream implementation @@ -150,6 +151,14 @@ func connect(url string, options ...nats.Option) (*nats.Conn, error) { return nc, nil } +// GetNATSConnection :nodoc: +func (j *jsImpl) GetNATSConnection() *nats.Conn { + if j == nil { + return nil + } + return j.natsConn +} + func (j *jsImpl) checkConnIsValid() (b bool) { return j.natsConn != nil && j.natsConn.IsConnected() } @@ -170,11 +179,12 @@ func (j *jsImpl) QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ... return j.jsCtx.QueueSubscribe(subj, queue, cb, opts...) } -// Close close NATS connection -func (j *jsImpl) Close() { - if j.checkConnIsValid() { - j.natsConn.Close() +// Subscribe :nodoc: +func (j *jsImpl) Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { + if !j.checkConnIsValid() { + return nil, ErrConnectionLost } + return j.jsCtx.Subscribe(subj, cb, opts...) } // AddStream add stream @@ -193,7 +203,7 @@ func (j *jsImpl) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.St } -// ConsumerInfo +// ConsumerInfo :nodoc: func (j *jsImpl) ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { if !j.checkConnIsValid() { return nil, ErrConnectionLost @@ -201,3 +211,18 @@ func (j *jsImpl) ConsumerInfo(streamName, consumerName string, opts ...nats.JSOp return j.jsCtx.ConsumerInfo(streamName, consumerName, opts...) } + +// SafeClose :nodoc: +func SafeClose(js JetStream) { + if js == nil { + return + } + natsConn := js.GetNATSConnection() + if natsConn == nil { + return + } + if !natsConn.IsConnected() { + return + } + natsConn.Close() +} diff --git a/jetstream_test.go b/jetstream_test.go index c3f6140..9dff18c 100644 --- a/jetstream_test.go +++ b/jetstream_test.go @@ -38,9 +38,7 @@ func TestPublish(t *testing.T) { n, err := NewNATSConnection(defaultURL, natsOpts...) require.NoError(t, err) - defer func() { - n.Close() - }() + defer SafeClose(n) streamConf := &nats.StreamConfig{ Name: "STREAM_NAME_PUBLISH", @@ -61,9 +59,7 @@ func TestQueueSubscribe(t *testing.T) { n, err := NewNATSConnection(defaultURL) require.NoError(t, err) - defer func() { - n.Close() - }() + defer SafeClose(n) streamConf := &nats.StreamConfig{ Name: "STREAM_NAME_QUEUE_SUBSCRIBE", @@ -110,10 +106,7 @@ func TestQueueSubscribe(t *testing.T) { t.Run("queue subscribe NatsEventAuditLogMessage", func(t *testing.T) { n, err := NewNATSConnection(defaultURL) require.NoError(t, err) - - defer func() { - n.Close() - }() + defer SafeClose(n) streamConf := &nats.StreamConfig{ Name: "STREAM_NAME_AUDIT", @@ -190,13 +183,157 @@ func TestQueueSubscribe(t *testing.T) { }) } +func TestSubscribe(t *testing.T) { + t.Run("subscribe NatsEventMessage", func(t *testing.T) { + n, err := NewNATSConnection(defaultURL) + require.NoError(t, err) + + n2, err := NewNATSConnection(defaultURL) + require.NoError(t, err) + defer SafeClose(n) + defer SafeClose(n2) + + streamConf := &nats.StreamConfig{ + Name: "STREAM_NAME_SUBSCRIBE", + Subjects: []string{"STREAM_NAME_SUBSCRIBE.*"}, + Storage: nats.FileStorage, + } + + _, err = n.AddStream(streamConf) + require.NoError(t, err) + + countMsg := 10 + subject := "STREAM_NAME_SUBSCRIBE.TEST" + + msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{ + ID: int64(1232), + UserID: int64(21), + }).Build() + + require.NoError(t, err) + + for i := 0; i < countMsg; i++ { + _, err = n.Publish(subject, msgBytes) + require.NoError(t, err) + } + + receiverCh := make(chan *nats.Msg) + sub, err := n.Subscribe(subject, func(msg *nats.Msg) { + receiverCh <- msg + }) + require.NoError(t, err) + + sub2, err := n2.Subscribe(subject, func(msg *nats.Msg) { + receiverCh <- msg + }) + require.NoError(t, err) + + // receive double of the message count beacuse of 2 subscriber + for i := 0; i < countMsg*2; i++ { + b := <-receiverCh + + assert.Equal(t, msgBytes, b.Data) + assert.Equal(t, subject, b.Subject, "test subject") + } + + _ = sub.Unsubscribe() + _ = sub2.Unsubscribe() + }) + + t.Run("subscribe NatsEventAuditLogMessage", func(t *testing.T) { + n, err := NewNATSConnection(defaultURL) + require.NoError(t, err) + + n2, err := NewNATSConnection(defaultURL) + require.NoError(t, err) + + defer SafeClose(n) + defer SafeClose(n2) + + streamConf := &nats.StreamConfig{ + Name: "STREAM_NAME_AUDIT", + Subjects: []string{"STREAM_NAME_AUDIT.*"}, + Storage: nats.FileStorage, + } + + _, err = n.AddStream(streamConf) + require.NoError(t, err) + + countMsg := 10 + subject := "STREAM_NAME_AUDIT.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" + + type User struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + oldData := User{ + ID: int64(123), + Name: "test name", + } + + newData := User{ + ID: int64(123), + Name: "new test name", + } + + byteOldData, err := json.Marshal(oldData) + require.NoError(t, err) + byteNewData, err := json.Marshal(newData) + require.NoError(t, err) + + createdAt, err := time.Parse("2006-01-02", "2020-01-29") + require.NoError(t, err) + + msg := &NatsEventAuditLogMessage{ + ServiceName: "test-audit", + UserID: 123, + AuditableType: "user", + AuditableID: "123", + Action: "update", + AuditedChanges: string(byteNewData), + OldData: string(byteOldData), + NewData: string(byteNewData), + CreatedAt: createdAt, + Error: nil, + } + msgBytes, err := msg.Build() + require.NoError(t, err) + + for i := 0; i < countMsg; i++ { + _, err = n.Publish(subject, msgBytes) + require.NoError(t, err) + + } + + receiverCh := make(chan *nats.Msg) + sub, err := n.Subscribe(subject, func(msg *nats.Msg) { + receiverCh <- msg + }) + require.NoError(t, err) + + sub2, err := n2.Subscribe(subject, func(msg *nats.Msg) { + receiverCh <- msg + }) + require.NoError(t, err) + + for i := 0; i < countMsg*2; i++ { + b := <-receiverCh + + assert.Equal(t, msgBytes, b.Data) + assert.Equal(t, subject, b.Subject, "test subject") + } + + _ = sub.Unsubscribe() + _ = sub2.Unsubscribe() + }) +} + func TestAddStream(t *testing.T) { n, err := NewNATSConnection(defaultURL) require.NoError(t, err) - defer func() { - n.Close() - }() + defer SafeClose(n) streamConf := &nats.StreamConfig{ Name: "STREAM_NAMEXX", @@ -223,7 +360,7 @@ func TestAddStream(t *testing.T) { func TestConsumerInfo(t *testing.T) { n, err := NewNATSConnection(defaultURL) require.NoError(t, err) - defer n.Close() + defer SafeClose(n) updateConf := &nats.StreamConfig{ Name: "STREAM_NAME", diff --git a/mock/mock_jetstream.go b/mock/mock_jetstream.go index f93022d..449c113 100644 --- a/mock/mock_jetstream.go +++ b/mock/mock_jetstream.go @@ -54,18 +54,6 @@ func (mr *MockJetStreamMockRecorder) AddStream(arg0 interface{}, arg1 ...interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddStream", reflect.TypeOf((*MockJetStream)(nil).AddStream), varargs...) } -// Close mocks base method. -func (m *MockJetStream) Close() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Close") -} - -// Close indicates an expected call of Close. -func (mr *MockJetStreamMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockJetStream)(nil).Close)) -} - // ConsumerInfo mocks base method. func (m *MockJetStream) ConsumerInfo(arg0, arg1 string, arg2 ...nats.JSOpt) (*nats.ConsumerInfo, error) { m.ctrl.T.Helper() @@ -86,6 +74,20 @@ func (mr *MockJetStreamMockRecorder) ConsumerInfo(arg0, arg1 interface{}, arg2 . return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsumerInfo", reflect.TypeOf((*MockJetStream)(nil).ConsumerInfo), varargs...) } +// GetNATSConnection mocks base method. +func (m *MockJetStream) GetNATSConnection() *nats.Conn { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNATSConnection") + ret0, _ := ret[0].(*nats.Conn) + return ret0 +} + +// GetNATSConnection indicates an expected call of GetNATSConnection. +func (mr *MockJetStreamMockRecorder) GetNATSConnection() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNATSConnection", reflect.TypeOf((*MockJetStream)(nil).GetNATSConnection)) +} + // Publish mocks base method. func (m *MockJetStream) Publish(arg0 string, arg1 []byte, arg2 ...nats.PubOpt) (*nats.PubAck, error) { m.ctrl.T.Helper() @@ -125,3 +127,23 @@ func (mr *MockJetStreamMockRecorder) QueueSubscribe(arg0, arg1, arg2 interface{} varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueSubscribe", reflect.TypeOf((*MockJetStream)(nil).QueueSubscribe), varargs...) } + +// Subscribe mocks base method. +func (m *MockJetStream) Subscribe(arg0 string, arg1 nats.MsgHandler, arg2 ...nats.SubOpt) (*nats.Subscription, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Subscribe", varargs...) + ret0, _ := ret[0].(*nats.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockJetStreamMockRecorder) Subscribe(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockJetStream)(nil).Subscribe), varargs...) +}