Skip to content

Commit 1e4528a

Browse files
committed
feat: implement broker unsubscribe
1 parent f4460a1 commit 1e4528a

File tree

3 files changed

+70
-11
lines changed

3 files changed

+70
-11
lines changed

broker.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
type brokerer interface {
1515
Publish(topic string, value *value) error
1616
Subscribe(topic string) *consumer
17+
Unsubscribe(topic, id string) error
1718
Purge(topic string) error
1819
Topics() ([]string, error)
1920
}
@@ -72,12 +73,12 @@ func processTopics(b *broker, topics []string) error {
7273
continue
7374
}
7475

75-
// log.Debug().
76-
// Str("topic", t).
77-
// Int("count", count).
78-
// Msg("returning delayed messages")
79-
8076
if count >= 1 {
77+
log.Debug().
78+
Str("topic", t).
79+
Int("count", count).
80+
Msg("returning delayed messages")
81+
8182
b.NotifyConsumer(t, eventTypeMsgReturned)
8283
}
8384
}
@@ -114,6 +115,22 @@ func (b *broker) Subscribe(topic string) *consumer {
114115
return &cons
115116
}
116117

118+
// Unsubscribe removes the consumer from the available pool for the topic.
119+
func (b *broker) Unsubscribe(topic, id string) error {
120+
b.Lock()
121+
defer b.Unlock()
122+
123+
consumers := b.consumers[topic]
124+
for i, v := range consumers {
125+
if v.id == id {
126+
b.consumers[topic] = append(consumers[:i], consumers[i+1:]...)
127+
return nil
128+
}
129+
}
130+
131+
return fmt.Errorf("consumer ID %s not found for topic %s", id, topic)
132+
}
133+
117134
// Purge removes the topic from the broker.
118135
func (b *broker) Purge(topic string) error {
119136
if err := b.store.Purge(topic); err != nil {
@@ -138,7 +155,9 @@ func (b *broker) NotifyConsumer(topic string, ev eventType) {
138155
select {
139156
case c.eventChan <- ev:
140157
return
141-
default: // If there is noone listening noop
158+
default:
159+
// TODO if it fails to send to the consumer, find another consumer to send
160+
// the message to and possibly remove this consumer from the pool.
142161
}
143162
}
144163
}

broker_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

broker_test.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"testing"
55

66
gomock "github.com/golang/mock/gomock"
7-
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
88
)
99

10-
func TestBrokerPublish(t *testing.T) {
10+
func TestBroker_Publish(t *testing.T) {
1111
ctrl := gomock.NewController(t)
1212
defer ctrl.Finish()
1313

@@ -21,10 +21,10 @@ func TestBrokerPublish(t *testing.T) {
2121

2222
b := newBroker(mockStore)
2323

24-
assert.NoError(t, b.Publish(topic, value))
24+
require.NoError(t, b.Publish(topic, value))
2525
}
2626

27-
func TestBrokerSubscribe(t *testing.T) {
27+
func TestBroker_Subscribe(t *testing.T) {
2828
ctrl := gomock.NewController(t)
2929
defer ctrl.Finish()
3030

@@ -37,5 +37,31 @@ func TestBrokerSubscribe(t *testing.T) {
3737
b := newBroker(mockStore)
3838
c := b.Subscribe(topic)
3939

40-
assert.IsType(t, &consumer{}, c)
40+
require.IsType(t, &consumer{}, c)
41+
}
42+
43+
func TestBroker_Unsubscribe(t *testing.T) {
44+
t.Run("removes consumer from the topic", func(t *testing.T) {
45+
b := broker{
46+
consumers: map[string][]consumer{},
47+
}
48+
49+
topic := "test_topic"
50+
51+
c := b.Subscribe(topic)
52+
err := b.Unsubscribe(topic, c.id)
53+
require.NoError(t, err)
54+
require.Len(t, b.consumers[topic], 0)
55+
})
56+
57+
t.Run("returns an error if the consumer doesn't exist", func(t *testing.T) {
58+
b := broker{
59+
consumers: map[string][]consumer{},
60+
}
61+
62+
topic := "test_topic"
63+
64+
err := b.Unsubscribe(topic, "test_id")
65+
require.Error(t, err)
66+
})
4167
}

0 commit comments

Comments
 (0)