Skip to content

Commit c570fe1

Browse files
committed
Removes the queue parameter from any Producer / Consumer function signature
The Producer / Consumer will be created for a fixed queue for a more consistency in the queue - message type association. For any give message type there will be a queue by design, so it makes sense to provide the queue name only in the construct functions.
1 parent d8e0628 commit c570fe1

File tree

7 files changed

+49
-44
lines changed

7 files changed

+49
-44
lines changed

consumer.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,25 @@ import (
1010
//go:generate mockgen -source=driver/driver.go -package=squeue_test -destination=driver_test.go
1111

1212
// NewConsumer creates a new consumer for the given T type of messages
13-
func NewConsumer[T json.Unmarshaler](d driver.Driver) Consumer[T] {
14-
return Consumer[T]{d}
13+
func NewConsumer[T json.Unmarshaler](d driver.Driver, queue string) Consumer[T] {
14+
return Consumer[T]{
15+
driver: d,
16+
queue: queue,
17+
}
1518
}
1619

1720
type Consumer[T json.Unmarshaler] struct {
1821
driver driver.Driver
22+
queue string
1923
}
2024

2125
// Consume retrieves messages from the given queue.
2226
// Any provided options will be sent to the underlying driver.
2327
// The messages are indefinetely consumed from the queue and
2428
// sent to the chan Message[T].
2529
// To stop consuming messages is sufficient to cancel the context.Context
26-
func (p *Consumer[T]) Consume(ctx context.Context, queue string, opts ...func(message any)) (chan Message[T], error) {
27-
messages, err := p.driver.Consume(ctx, queue, opts...)
30+
func (p *Consumer[T]) Consume(ctx context.Context, opts ...func(message any)) (chan Message[T], error) {
31+
messages, err := p.driver.Consume(ctx, p.queue, opts...)
2832
if err != nil {
2933
return nil, wrapErr(err, ErrDriver, nil)
3034
}
@@ -63,6 +67,6 @@ func (p *Consumer[T]) Consume(ctx context.Context, queue string, opts ...func(me
6367

6468
// Ack explicitly acknowldge the message handling.
6569
// It can be implemented as No Operation for some drivers.
66-
func (p *Consumer[T]) Ack(queue string, m Message[T]) error {
67-
return p.driver.Ack(queue, m.ID)
70+
func (p *Consumer[T]) Ack(m Message[T]) error {
71+
return p.driver.Ack(p.queue, m.ID)
6872
}

consumer_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,28 +31,28 @@ func (suite *ConsumerTestSuite) TearDownTest() {
3131
}
3232

3333
func (suite *ConsumerTestSuite) TestNewConsumer() {
34-
squeue.NewConsumer[*TestMessage](suite.driver)
34+
squeue.NewConsumer[*TestMessage](suite.driver, "test-queue")
3535
}
3636

3737
func (suite *ConsumerTestSuite) TestConsumeMessages_DriverError() {
38-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
39-
ctx := context.Background()
4038
queue := "test-queue"
39+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
40+
ctx := context.Background()
4141

4242
suite.driver.
4343
EXPECT().
4444
Consume(ctx, queue).
4545
Return(nil, errors.New("consume error"))
4646

47-
messages, err := consumer.Consume(ctx, queue)
47+
messages, err := consumer.Consume(ctx)
4848
suite.Nil(messages)
4949
suite.Error(err)
5050
}
5151

5252
func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageWithError() {
53-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
54-
ctx := context.Background()
5553
queue := "test-queue"
54+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
55+
ctx := context.Background()
5656

5757
dMessages := make(chan driver.Message)
5858
go func() {
@@ -65,7 +65,7 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageWithError() {
6565
Consume(ctx, queue).
6666
Return(dMessages, nil)
6767

68-
messages, err := consumer.Consume(ctx, queue)
68+
messages, err := consumer.Consume(ctx)
6969

7070
suite.NotNil(messages)
7171
suite.Nil(err)
@@ -80,9 +80,9 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageWithError() {
8080
}
8181

8282
func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageUnmarshallError() {
83-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
84-
ctx := context.Background()
8583
queue := "test-queue"
84+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
85+
ctx := context.Background()
8686

8787
dMessages := make(chan driver.Message)
8888
go func() {
@@ -99,7 +99,7 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageUnmarshallError()
9999
Consume(ctx, queue).
100100
Return(dMessages, nil)
101101

102-
messages, err := consumer.Consume(ctx, queue)
102+
messages, err := consumer.Consume(ctx)
103103

104104
suite.NotNil(messages)
105105
suite.Nil(err)
@@ -117,9 +117,9 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageUnmarshallError()
117117
}
118118

119119
func (suite *ConsumerTestSuite) TestConsumeMessages_RealWorldScenarioWithErrors() {
120-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
121-
ctx := context.Background()
122120
queue := "test-queue"
121+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
122+
ctx := context.Background()
123123

124124
dMessages := make(chan driver.Message)
125125
go func() {
@@ -147,7 +147,7 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_RealWorldScenarioWithErrors(
147147
Consume(ctx, queue).
148148
Return(dMessages, nil)
149149

150-
messages, err := consumer.Consume(ctx, queue)
150+
messages, err := consumer.Consume(ctx)
151151

152152
suite.NotNil(messages)
153153
suite.Nil(err)

internal/examples/memory/main.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,19 @@ func main() {
5555

5656
d := driver.NewMemoryDriver(time.Microsecond)
5757

58-
queue := squeue.NewProducer(d)
59-
consumer := squeue.NewConsumer[*myMessage](d)
58+
queue := squeue.NewProducer(d, "queue.test")
59+
consumer := squeue.NewConsumer[*myMessage](d, "queue.test")
6060

61-
events, err := consumer.Consume(ctx, "queue.test")
61+
events, err := consumer.Consume(ctx)
6262
if err != nil {
6363
panic(err)
6464
}
6565

66-
_ = queue.Enqueue("queue.test", &myMessage{"foo"})
67-
_ = queue.Enqueue("queue.test", &myMessage{"bar"})
68-
_ = queue.Enqueue("queue.test", &myMessage{"baz"})
66+
_ = queue.Enqueue(&myMessage{"foo"})
67+
_ = queue.Enqueue(&myMessage{"bar"})
68+
_ = queue.Enqueue(&myMessage{"baz"})
6969

70-
// Consumer gorouting
70+
// Consumer goroutine
7171
go func() {
7272
for evt := range events {
7373
log.Print("Received ", evt.Content)

internal/examples/sqs/consumer/consumer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ func main() {
5454
panic(err)
5555
}
5656

57-
sub := squeue.NewConsumer[*sqsexample.MyEvent](d)
5857
q := "test-simone"
58+
sub := squeue.NewConsumer[*sqsexample.MyEvent](d, q)
5959

60-
messages, err := sub.Consume(ctx, q)
60+
messages, err := sub.Consume(ctx)
6161
if err != nil {
6262
panic(err)
6363
}
@@ -72,7 +72,7 @@ func main() {
7272
}
7373

7474
log.Printf("Received %s", message.Content.Name)
75-
if err := sub.Ack(q, message); err != nil {
75+
if err := sub.Ack(message); err != nil {
7676
log.Print("Failed sending ack ", err)
7777
}
7878
}(m)

internal/examples/sqs/producer/producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ func main() {
3939
panic(err)
4040
}
4141

42-
pub := squeue.NewProducer(d)
42+
pub := squeue.NewProducer(d, "test-simone")
4343
tick := time.Tick(time.Second * 2)
4444

4545
for i := 0; ; i++ {
4646
<-tick
47-
_ = pub.Enqueue("test-simone", &sqsexample.MyEvent{Name: fmt.Sprintf("Message #%d", i)})
47+
_ = pub.Enqueue(&sqsexample.MyEvent{Name: fmt.Sprintf("Message #%d", i)})
4848
}
4949
}

producer.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,25 @@ import (
66
"github.com/simodima/squeue/driver"
77
)
88

9-
func NewProducer(d driver.Driver) Producer {
9+
func NewProducer(d driver.Driver, queue string) Producer {
1010
return Producer{
1111
driver: d,
12+
queue: queue,
1213
}
1314
}
1415

1516
type Producer struct {
1617
driver driver.Driver
18+
queue string
1719
}
1820

19-
// Enqueue sends a message to the given queue
20-
// any provided options will be sent to the
21-
// underlying driver
22-
func (q *Producer) Enqueue(queue string, message json.Marshaler, opts ...func(message any)) error {
21+
// Enqueue sends a message to the underlying queue,
22+
// any provided options will be sent to the driver.
23+
func (q *Producer) Enqueue(message json.Marshaler, opts ...func(message any)) error {
2324
data, err := json.Marshal(message)
2425
if err != nil {
2526
return wrapErr(err, ErrMarshal, nil)
2627
}
2728

28-
return q.driver.Enqueue(queue, data, opts...)
29+
return q.driver.Enqueue(q.queue, data, opts...)
2930
}

producer_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,45 +39,45 @@ func (suite *QueueTestSuite) TearDownTest() {
3939
}
4040

4141
func (suite *QueueTestSuite) TestNewProducer() {
42-
squeue.NewProducer(suite.driver)
42+
squeue.NewProducer(suite.driver, "test-queue")
4343
}
4444

4545
func (suite *QueueTestSuite) TestEnqueueMessage_DriverError() {
4646
queue := "test-queue"
47-
producer := squeue.NewProducer(suite.driver)
47+
producer := squeue.NewProducer(suite.driver, queue)
4848

4949
suite.driver.
5050
EXPECT().
5151
Enqueue(queue, []byte(`{"name":"test message"}`)).
5252
Return(errors.New("producer error"))
5353

54-
err := producer.Enqueue(queue, &TestMessage{Name: "test message"})
54+
err := producer.Enqueue(&TestMessage{Name: "test message"})
5555
suite.Error(err)
5656
}
5757

5858
func (suite *QueueTestSuite) TestEnqueueMessage_MarshalingError() {
5959
queue := "test-queue"
60-
producer := squeue.NewProducer(suite.driver)
60+
producer := squeue.NewProducer(suite.driver, queue)
6161

6262
suite.driver.
6363
EXPECT().
6464
Enqueue(queue, nil).
6565
Times(0)
6666

67-
err := producer.Enqueue(queue, &wrongMessage{})
67+
err := producer.Enqueue(&wrongMessage{})
6868
suite.Error(err)
6969
}
7070

7171
func (suite *QueueTestSuite) TestEnqueueMessage() {
7272
queue := "test-queue"
73-
producer := squeue.NewProducer(suite.driver)
73+
producer := squeue.NewProducer(suite.driver, queue)
7474
message := &TestMessage{Name: "test message"}
7575
suite.driver.
7676
EXPECT().
7777
Enqueue(queue, []byte(`{"name":"test message"}`)).
7878
Return(nil)
7979

80-
err := producer.Enqueue(queue, message)
80+
err := producer.Enqueue(message)
8181
suite.Nil(err)
8282
}
8383

0 commit comments

Comments
 (0)