Skip to content

Commit e33ab37

Browse files
authored
Add resoureces to Publisher (#18)
1 parent 9e2d7ef commit e33ab37

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ env:
1515
up:
1616
@docker compose up -d
1717
@./scripts/wait-for-sqs.sh 20
18-
@docker compose exec postgres /scripts/wait-for-postgres.sh 30
18+
@docker compose exec -T postgres /scripts/wait-for-postgres.sh 30
1919

2020
.PHONY: down
2121
down:

aws/publisher.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78

89
"github.com/aws/aws-sdk-go-v2/service/sns"
910
"github.com/aws/aws-sdk-go-v2/service/sqs"
@@ -17,24 +18,27 @@ var ErrTopicOrQueueNotFound = errors.New("could not find neither topic ARN nor q
1718
type Publisher struct {
1819
sns *sns.Client
1920
sqs *sqs.Client
20-
resources map[string]string
21+
resources sync.Map
2122
}
2223

2324
// NewPublisher creates a new SNS+SQS publisher.
2425
func NewPublisher(sns *sns.Client, sqs *sqs.Client, resources map[string]string) *Publisher {
25-
return &Publisher{
26-
sns: sns,
27-
sqs: sqs,
28-
resources: resources,
26+
p := &Publisher{
27+
sns: sns,
28+
sqs: sqs,
2929
}
30+
for k, v := range resources {
31+
p.resources.Store(k, v)
32+
}
33+
return p
3034
}
3135

3236
// Publish a message trough SNS.
3337
func (p *Publisher) Publish(ctx context.Context, resourceID string, envelopes ...*pubsub.Envelope) error {
3438
// If the resource exists we get it, otherwise we use the identifier.
35-
resource, _ := p.resources[resourceID]
36-
if resource == "" {
37-
resource = resourceID
39+
resource := resourceID
40+
if r, ok := p.resources.Load(resourceID); ok {
41+
resource = r.(string)
3842
}
3943

4044
// Note: topic ARN "are" technically URLs, so this check need to go first.
@@ -49,6 +53,10 @@ func (p *Publisher) Publish(ctx context.Context, resourceID string, envelopes ..
4953
return fmt.Errorf("%w: %s", ErrTopicOrQueueNotFound, resource)
5054
}
5155

56+
func (p *Publisher) AddResource(resourceID, resource string) {
57+
p.resources.Store(resourceID, resource)
58+
}
59+
5260
func publishSNSMessage(ctx context.Context, c *sns.Client, topicARN string, envelopes ...*pubsub.Envelope) error {
5361
for _, env := range envelopes {
5462
// every FIFO queue message needs to have a message group in SNS

aws/publisher_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ func TestPublisher(t *testing.T) {
7777

7878
requireReceivedEnvelope(t, msgs, env)
7979
})
80+
81+
t.Run("a resource can be added", func(t *testing.T) {
82+
pub := NewPublisher(snsTest, sqsTest, nil)
83+
pub.AddResource("my-topic-alias", topicARN)
84+
85+
err := pub.Publish(ctx, "my-topic-alias", env)
86+
if err != nil {
87+
t.Fatalf("unexpected error publishing message; got %v", err)
88+
}
89+
90+
requireReceivedEnvelope(t, msgs, env)
91+
})
8092
}
8193

8294
func requireReceivedEnvelope(t *testing.T, msgs <-chan pubsub.Next, env *pubsub.Envelope) {

0 commit comments

Comments
 (0)