Skip to content

Commit 99e66fa

Browse files
committed
consumer topic renamed to name
1 parent 38c7a5f commit 99e66fa

File tree

8 files changed

+52
-52
lines changed

8 files changed

+52
-52
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ test: up integration
2626

2727
integration:
2828
@mkdir -p coverage
29-
@go test -race -v -tags=integration -coverpkg=./... -coverprofile=./coverage/base.cov ./...
30-
@cd aws && go test -race -v -tags=integration -coverpkg=./... -coverprofile=../coverage/awsv2.cov ./...
31-
@cd schedule/storage/postgres && go test -race -v -tags=integration -coverpkg=./... -coverprofile=../../../coverage/postgres.cov ./...
29+
@go test -v -tags=integration -coverpkg=./... -coverprofile=./coverage/base.cov ./...
30+
@cd aws && go test -v -tags=integration -coverpkg=./... -coverprofile=../coverage/awsv2.cov ./...
31+
@cd schedule/storage/postgres && go test -v -tags=integration -coverpkg=./... -coverprofile=../../../coverage/postgres.cov ./...
3232

3333
.PHONY: clean
3434
clean:

handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (f HandlerFunc) HandleMessage(ctx context.Context, message *Message) error
2424
}
2525

2626
// Dispatcher is a message handler middleware that can be used to register
27-
// different handlers for the same topic, based on the message name.
27+
// different handlers for the same consumer, based on the message name.
2828
func Dispatcher(handlers map[string]Handler) HandlerFunc {
2929
return func(ctx context.Context, message *Message) error {
3030
h, ok := handlers[message.Name]

publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
var (
1010
// ErrResourceDoesNotExist indicates that the resource (topic, queue...) does not exist.
11-
ErrResourceDoesNotExist = errors.New("topic does not exist")
11+
ErrResourceDoesNotExist = errors.New("name does not exist")
1212
)
1313

1414
// Publisher describes the top level method to publish messages.

publisher_recorder_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ func TestPublisherRecorder(t *testing.T) {
99
ctx := context.Background()
1010
p := NewPublisherRecorder(NoOpPublisher())
1111

12-
err := p.Publish(ctx, "topic-1", &Message{
12+
err := p.Publish(ctx, "name-1", &Message{
1313
ID: "123",
1414
}, &Message{
1515
ID: "456",
@@ -18,7 +18,7 @@ func TestPublisherRecorder(t *testing.T) {
1818
t.Fatal("unexpected error publishing", err)
1919
}
2020

21-
err = p.Publish(ctx, "topic-2", &Message{
21+
err = p.Publish(ctx, "name-2", &Message{
2222
ID: "987",
2323
}, &Message{
2424
ID: "654",
@@ -35,27 +35,27 @@ func TestPublisherRecorder(t *testing.T) {
3535
topic string
3636
id string
3737
}{
38-
{"topic-1", "123"},
39-
{"topic-1", "456"},
40-
{"topic-2", "987"},
41-
{"topic-2", "654"},
38+
{"name-1", "123"},
39+
{"name-1", "456"},
40+
{"name-2", "987"},
41+
{"name-2", "654"},
4242
} {
4343
if got, want := p.Messages()[i].Topic, want.topic; got != want {
44-
t.Fatalf("unexpected message topic at position %d; got %s, want %s", i, got, want)
44+
t.Fatalf("unexpected message name at position %d; got %s, want %s", i, got, want)
4545
}
4646
if got, want := p.Messages()[i].Message.ID, want.id; got != want {
4747
t.Fatalf("unexpected message id at position %d; got %s, want %s", i, got, want)
4848
}
4949
}
5050

5151
for i, want := range []string{"123", "456"} {
52-
if got := p.MessagesMap()["topic-1"][i].ID; got != want {
52+
if got := p.MessagesMap()["name-1"][i].ID; got != want {
5353
t.Fatalf("unexpected message id at position %d; got %s, want %s", i, got, want)
5454
}
5555
}
5656

5757
for i, want := range []string{"987", "654"} {
58-
if got := p.TopicMessages("topic-2")[i].ID; got != want {
58+
if got := p.TopicMessages("name-2")[i].ID; got != want {
5959
t.Fatalf("unexpected message id at position %d; got %s, want %s", i, got, want)
6060
}
6161
}

publisher_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestPublisher_PublishHandlerSendMessages(t *testing.T) {
5656
t.Fatalf("expected error; got %v, want %v", err, dummyErr)
5757
}
5858
if publishedTopic != topicToSend {
59-
t.Fatalf("unpected pubslihing topic; got %v, want %v", publishedTopic, topicToSend)
59+
t.Fatalf("unpected pubslihing name; got %v, want %v", publishedTopic, topicToSend)
6060
}
6161
if reflect.DeepEqual(messagesToSend, publishedMessages) {
6262
t.Fatalf("expected error; got %v, want %v", err, dummyErr)
@@ -91,7 +91,7 @@ func TestPublisher_Wrapper(t *testing.T) {
9191
t.Fatalf("message does not have the injected attribute; got %+v", e.Attributes)
9292
}
9393
if e.Name != topic {
94-
t.Fatalf("topic should be used as event name; got %+v", e.Name)
94+
t.Fatalf("name should be used as event name; got %+v", e.Name)
9595
}
9696
}
9797
return nil
@@ -105,7 +105,7 @@ func TestPublisher_Wrapper(t *testing.T) {
105105
)
106106

107107
ctx := context.WithValue(context.Background(), ctxKey, injectedValue)
108-
err := p.Publish(ctx, "foo-topic", &pubsub.Message{})
108+
err := p.Publish(ctx, "foo-name", &pubsub.Message{})
109109
if err != nil {
110110
t.Fatal("unexpected error publishing", err)
111111
}

router.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
)
1212

1313
var (
14-
ErrTopicAlreadyRegistered = errors.New("topic already registered")
15-
ErrRouterAlreadyRunning = errors.New("router already running")
16-
ErrRouterAlreadyStopped = errors.New("router already stopped")
14+
ErrConsumerAlreadyRegistered = errors.New("consumer already registered")
15+
ErrRouterAlreadyRunning = errors.New("router already running")
16+
ErrRouterAlreadyStopped = errors.New("router already stopped")
1717
)
1818

1919
type status uint8
@@ -37,9 +37,9 @@ const (
3737
// Consumer consumes messages from a single subscription.
3838
type consumer struct {
3939
subscriber Subscriber
40-
handler Handler
41-
topic string
42-
next <-chan Next
40+
handler Handler
41+
name string
42+
next <-chan Next
4343
backoff BackoffStrategy
4444
ackDecider AckDecider
4545
}
@@ -134,7 +134,7 @@ func WithBackoff(strategy BackoffStrategy) func(*consumer) {
134134
}
135135
}
136136

137-
func (r *Router) Register(topic string, subscriber Subscriber, handler Handler, opts ...ConsumerOption) error {
137+
func (r *Router) Register(name string, subscriber Subscriber, handler Handler, opts ...ConsumerOption) error {
138138
r.mx.Lock()
139139
defer r.mx.Unlock()
140140

@@ -145,24 +145,24 @@ func (r *Router) Register(topic string, subscriber Subscriber, handler Handler,
145145
return ErrRouterAlreadyStopped
146146
}
147147

148-
_, found := r.consumers[topic]
148+
_, found := r.consumers[name]
149149
if found {
150-
return fmt.Errorf("%w: %s", ErrTopicAlreadyRegistered, topic)
150+
return fmt.Errorf("%w: %s", ErrConsumerAlreadyRegistered, name)
151151
}
152152

153153
if r.consumers == nil {
154154
r.consumers = make(map[string]*consumer)
155155
}
156156
c := &consumer{
157-
topic: topic,
157+
name: name,
158158
subscriber: subscriber,
159159
handler: handler,
160160
}
161161
for _, opt := range opts {
162162
opt(c)
163163
}
164164

165-
r.consumers[topic] = c
165+
r.consumers[name] = c
166166

167167
return nil
168168
}
@@ -211,7 +211,7 @@ func (r *Router) subscribe(consumers map[string]*consumer) ([]*consumer, error)
211211
g.Go(func() error {
212212
next, subscribeErr := c.subscriber.Subscribe()
213213
if subscribeErr != nil {
214-
subscribeErr = fmt.Errorf("subscribe to topic %s failed: %w", c.topic, subscribeErr)
214+
subscribeErr = fmt.Errorf("subscribe to name %s failed: %w", c.name, subscribeErr)
215215
}
216216

217217
if subscribeErr == nil {
@@ -249,7 +249,7 @@ func (r *Router) stop(consumers []*consumer) error {
249249
g.Go(func() error {
250250
stopErr := c.subscriber.Stop(ctx)
251251
if stopErr != nil {
252-
stopErr = fmt.Errorf("error stopping subscriber for topic %s: %w", c.topic, stopErr)
252+
stopErr = fmt.Errorf("error stopping subscriber for consumer %s: %w", c.name, stopErr)
253253
}
254254
return stopErr
255255
})
@@ -265,7 +265,7 @@ func (r *Router) run(ctx context.Context, consumers []*consumer) (err error) {
265265
g.Go(func() error {
266266
consumerErr := r.consume(ctx, c)
267267
if consumerErr != nil {
268-
consumerErr = fmt.Errorf("error consuming from topic %s: %w", c.topic, consumerErr)
268+
consumerErr = fmt.Errorf("error consuming from consumer %s: %w", c.name, consumerErr)
269269
}
270270
return consumerErr
271271
})
@@ -293,7 +293,7 @@ func (r *Router) consume(ctx context.Context, c *consumer) error {
293293
continue
294294
}
295295

296-
data, err := r.Unmarshaller.Unmarshal(c.topic, rmsg)
296+
data, err := r.Unmarshaller.Unmarshal(c.name, rmsg)
297297
if err := r.check(ctx, r.OnUnmarshal, c, rmsg, err); err != nil {
298298
return err
299299
}
@@ -325,7 +325,7 @@ func (r *Router) consume(ctx context.Context, c *consumer) error {
325325

326326
func (r *Router) check(ctx context.Context, f Checkpoint, c *consumer, msg ReceivedMessage, err error) error {
327327
if f != nil {
328-
return f(ctx, c.topic, msg, err)
328+
return f(ctx, c.name, msg, err)
329329
}
330330
return nil
331331
}
@@ -370,10 +370,10 @@ func (r *Router) messageContext(ctx context.Context, msg ReceivedMessage) contex
370370

371371
func (r *Router) ack(ctx context.Context, c *consumer, msg ReceivedMessage, err error) Acknowledgement {
372372
if c.ackDecider != nil {
373-
return c.ackDecider(ctx, c.topic, msg, err)
373+
return c.ackDecider(ctx, c.name, msg, err)
374374
}
375375

376-
return r.AckDecider(ctx, c.topic, msg, err)
376+
return r.AckDecider(ctx, c.name, msg, err)
377377
}
378378

379379
func (r *Router) backoff(c *consumer, msg *Message) time.Duration {

router_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ func TestRouter_Run(t *testing.T) {
3131
}
3232
})
3333

34-
t.Run("cannot subscribe twice to the same topic", func(t *testing.T) {
34+
t.Run("cannot subscribe twice using the same consumer name", func(t *testing.T) {
3535
var router pubsub.Router
3636

3737
err := router.Register("same", &stubs.SubscriberStub{}, handlerDummy)
3838
if err != nil {
39-
t.Fatalf("unexpected error registering the topic: %v", err)
39+
t.Fatalf("unexpected error registering the consumer: %v", err)
4040
}
4141

4242
err = router.Register("same", &stubs.SubscriberStub{}, handlerDummy)
4343
if err == nil {
44-
t.Fatalf("unexpected success registering the topic again: %v", err)
44+
t.Fatalf("unexpected success registering the consumer again: %v", err)
4545
}
4646
})
4747

@@ -160,7 +160,7 @@ func TestRouter_Run(t *testing.T) {
160160
})
161161

162162
t.Run("checkpoints are called", func(t *testing.T) {
163-
subscriberTopic := "foo"
163+
consumerName := "foo"
164164

165165
var checkpointsCalled int
166166

@@ -171,31 +171,31 @@ func TestRouter_Run(t *testing.T) {
171171
return nil
172172
}
173173

174-
verifyCheckpoint := func(checkpoint string, topic string, message pubsub.ReceivedMessage, err error) error {
174+
verifyCheckpoint := func(checkpoint string, name string, message pubsub.ReceivedMessage, err error) error {
175175
checkpointsCalled++
176176
if err != nil {
177177
return err
178178
}
179179
if message.(*stubs.ReceivedMessageStub) != msg {
180180
return fmt.Errorf("%s mesage is not the equal; got %+v", checkpoint, message)
181181
}
182-
if topic != subscriberTopic {
183-
return fmt.Errorf("%s topic is not correct; got %+v, want %s", checkpoint, topic, subscriberTopic)
182+
if name != consumerName {
183+
return fmt.Errorf("%s consumer is not correct; got %+v, want %s", checkpoint, name, consumerName)
184184
}
185185
return nil
186186
}
187187
router := pubsub.Router{
188-
OnReceive: func(_ context.Context, topic string, message pubsub.ReceivedMessage, err error) error {
189-
return verifyCheckpoint("OnReceive", topic, message, err)
188+
OnReceive: func(_ context.Context, name string, message pubsub.ReceivedMessage, err error) error {
189+
return verifyCheckpoint("OnReceive", name, message, err)
190190
},
191-
OnUnmarshal: func(_ context.Context, topic string, message pubsub.ReceivedMessage, err error) error {
192-
return verifyCheckpoint("OnUnmarshal", topic, message, err)
191+
OnUnmarshal: func(_ context.Context, name string, message pubsub.ReceivedMessage, err error) error {
192+
return verifyCheckpoint("OnUnmarshal", name, message, err)
193193
},
194-
OnHandler: func(_ context.Context, topic string, message pubsub.ReceivedMessage, err error) error {
195-
return verifyCheckpoint("OnHandler", topic, message, err)
194+
OnHandler: func(_ context.Context, name string, message pubsub.ReceivedMessage, err error) error {
195+
return verifyCheckpoint("OnHandler", name, message, err)
196196
},
197-
OnAck: func(_ context.Context, topic string, message pubsub.ReceivedMessage, err error) error {
198-
return verifyCheckpoint("OnAck", topic, message, err)
197+
OnAck: func(_ context.Context, name string, message pubsub.ReceivedMessage, err error) error {
198+
return verifyCheckpoint("OnAck", name, message, err)
199199
},
200200
}
201201

@@ -211,7 +211,7 @@ func TestRouter_Run(t *testing.T) {
211211
},
212212
}
213213

214-
err := router.Register(subscriberTopic, s, handlerDummy)
214+
err := router.Register(consumerName, s, handlerDummy)
215215
if err != nil {
216216
t.Fatal("cannot register handler", err)
217217
}

scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestPublisher(t *testing.T) {
3333
return fmt.Errorf("due date is not in range: expected %v; got %v", expected, dueDate)
3434
}
3535
if gotTopic != topic {
36-
return fmt.Errorf("unexpected topic: expected %v; got %v", topic, gotTopic)
36+
return fmt.Errorf("unexpected name: expected %v; got %v", topic, gotTopic)
3737
}
3838
if len(envelopes) != 1 {
3939
return fmt.Errorf("no messages sent")

0 commit comments

Comments
 (0)