From 110036f00934694ba873dbfd27f977ee122a431c Mon Sep 17 00:00:00 2001 From: krishnamurthypranesh Date: Fri, 6 May 2022 14:32:49 +0530 Subject: [PATCH 1/5] :sparkles: Add support for bulk publishing of messages --- publisher/sns/sns.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/publisher/sns/sns.go b/publisher/sns/sns.go index a089bed..010b9e7 100644 --- a/publisher/sns/sns.go +++ b/publisher/sns/sns.go @@ -15,6 +15,7 @@ import ( // Publisher.service and interface that we can mock for testing. type sender interface { PublishWithContext(ctx context.Context, input *sns.PublishInput, o ...request.Option) (*sns.PublishOutput, error) + PublishBatchWithContext(ctx context.Context, input *sns.PublishBatchInput, o ...request.Option) (*sns.PublishBatchOutput, error) } // Config holds the info required to work with AWS SNS @@ -58,6 +59,40 @@ func (p *Publisher) Publish(ctx context.Context, msg interface{}) error { return err } +func (p *Publisher) PublishBatch(ctx context.Context, msgs []interface{}) error { + defaultMessageGroupID := "default" + + requestEntries := make([]*sns.PublishBatchRequestEntry, 0) + + isFifo := strings.Contains(strings.ToLower(p.cfg.TopicArn), "fifo") + + for _, msg := range msgs { + b, err := json.Marshal(msg) + if err != nil { + return err + } + + requestEntry := &sns.PublishBatchRequestEntry{ + Message: aws.String(string(b)), + } + + if isFifo { + requestEntry.MessageGroupId = &defaultMessageGroupID + } + + requestEntries = append(requestEntries, requestEntry) + } + + input := &sns.PublishBatchInput{ + PublishBatchRequestEntries: requestEntries, + TopicArn: &p.cfg.TopicArn, + } + + _, err := p.sns.PublishBatchWithContext(ctx, input) + + return err +} + func defaultPublisherConfig(cfg *Config) { if cfg.AWSSession == nil { cfg.AWSSession = session.Must(session.NewSession()) From f54ea32388f443e1436557064a3384a6cc807dab Mon Sep 17 00:00:00 2001 From: krishnamurthypranesh Date: Fri, 6 May 2022 14:33:19 +0530 Subject: [PATCH 2/5] :white_check_mark: Add mock and test for bulk publish function --- publisher/sns/mock_test.go | 7 +++++++ publisher/sns/sns_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/publisher/sns/mock_test.go b/publisher/sns/mock_test.go index bc96cea..61d3904 100644 --- a/publisher/sns/mock_test.go +++ b/publisher/sns/mock_test.go @@ -15,3 +15,10 @@ func (p *snsPublisherMock) PublishWithContext(ctx context.Context, input *sns.Pu p.queue <- input.Message return &sns.PublishOutput{}, nil } + +func (p *snsPublisherMock) PublishBatchWithContext(ctx context.Context, input *sns.PublishBatchInput, o ...request.Option) (*sns.PublishBatchOutput, error) { + for _, entry := range input.PublishBatchRequestEntries { + p.queue <- entry.Message + } + return &sns.PublishBatchOutput{}, nil +} diff --git a/publisher/sns/sns_test.go b/publisher/sns/sns_test.go index d519461..b0feddf 100644 --- a/publisher/sns/sns_test.go +++ b/publisher/sns/sns_test.go @@ -26,6 +26,31 @@ func TestPublisher(t *testing.T) { require.Equal(t, *publishedMessage, `{"msg":"message"}`) } +func TestPublisherBatch(t *testing.T) { + inputs := []interface{}{ + jsonString(`{"key":"val1"}`), + jsonString(`{"key":"val2"}`), + } + + queue := make(chan *string, len(inputs)) + defer close(queue) + + pubs := New(Config{}) + pubs.sns = &snsPublisherMock{queue: queue} + + require.NoError(t, pubs.PublishBatch(context.TODO(), inputs)) + + idx := 0 + for v := range queue { + publishedMessage := *v + require.Equal(t, jsonString(publishedMessage), inputs[idx]) + idx++ + if idx >= len(inputs) { + break + } + } +} + func TestPublisherDefaults(t *testing.T) { tt := []struct { From 20fe59e93d53255c8fa7598293cb39f257e5aff8 Mon Sep 17 00:00:00 2001 From: krishnamurthypranesh Date: Fri, 6 May 2022 14:57:19 +0530 Subject: [PATCH 3/5] :necktie: Add logic for scrolling through input message set --- publisher/sns/sns.go | 72 ++++++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/publisher/sns/sns.go b/publisher/sns/sns.go index 010b9e7..57c9ad0 100644 --- a/publisher/sns/sns.go +++ b/publisher/sns/sns.go @@ -59,37 +59,69 @@ func (p *Publisher) Publish(ctx context.Context, msg interface{}) error { return err } +// PublishBatch allows SNS Publisher to implement the publisher.Publisher interface +// and publish messages in a single batch to an AWS SNS backend. Since AWS SNS batch +// publish can only handle a maximum payload of 10 messages at a time, the messages +// supplied will be published in batches of 10. For this reason, message sets are best +// kept under 100 messages so that all messages can be published in 10 tries. In case +// of failure when parsing or publishing any of the messages, this function will stop +// further publishing and return an error func (p *Publisher) PublishBatch(ctx context.Context, msgs []interface{}) error { - defaultMessageGroupID := "default" - - requestEntries := make([]*sns.PublishBatchRequestEntry, 0) + var ( + defaultMessageGroupID = "default" + err error + ) isFifo := strings.Contains(strings.ToLower(p.cfg.TopicArn), "fifo") - for _, msg := range msgs { - b, err := json.Marshal(msg) - if err != nil { - return err + var ( + numPublishedMessages = 0 + start = 0 + end = 10 // 10 is the maximum batch size for SNS.PublishBatch + ) + if end > len(msgs) { + end = len(msgs) + } + for numPublishedMessages < len(msgs) { + var ( + requestEntries = make([]*sns.PublishBatchRequestEntry, 0) + ) + for idx := start; idx < end; idx++ { + msg := msgs[idx] + + b, err := json.Marshal(msg) + if err != nil { + return err + } + + requestEntry := &sns.PublishBatchRequestEntry{ + Message: aws.String(string(b)), + } + + if isFifo { + requestEntry.MessageGroupId = &defaultMessageGroupID + } + + requestEntries = append(requestEntries, requestEntry) } - requestEntry := &sns.PublishBatchRequestEntry{ - Message: aws.String(string(b)), + input := &sns.PublishBatchInput{ + PublishBatchRequestEntries: requestEntries, + TopicArn: &p.cfg.TopicArn, } - - if isFifo { - requestEntry.MessageGroupId = &defaultMessageGroupID + _, err = p.sns.PublishBatchWithContext(ctx, input) + if err != nil { + return err } - requestEntries = append(requestEntries, requestEntry) - } - - input := &sns.PublishBatchInput{ - PublishBatchRequestEntries: requestEntries, - TopicArn: &p.cfg.TopicArn, + numPublishedMessages += len(requestEntries) + start = end + end += 10 + if end > len(msgs) { + end = len(msgs) + } } - _, err := p.sns.PublishBatchWithContext(ctx, input) - return err } From 3043634dad6618443c43496fecc4f8144664c3f2 Mon Sep 17 00:00:00 2001 From: krishnamurthypranesh Date: Fri, 6 May 2022 19:31:48 +0530 Subject: [PATCH 4/5] :heavy_plus_sign: Add uuid as dependency --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index db368c3..3845158 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.0 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect diff --git a/go.sum b/go.sum index 85cd547..edbd508 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/aws/aws-sdk-go v1.43.24 h1:7c2PniJ0wpmWsIA6OtYBw6wS7DF0IjbhvPq+0ZQYNX github.com/aws/aws-sdk-go v1.43.24/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= From 144984961650516e201c643b2bc87cbb16ee9062 Mon Sep 17 00:00:00 2001 From: krishnamurthypranesh Date: Fri, 6 May 2022 19:32:14 +0530 Subject: [PATCH 5/5] :adhesive_bandage: Add id to batch request entry --- publisher/sns/sns.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/publisher/sns/sns.go b/publisher/sns/sns.go index 57c9ad0..e215fd6 100644 --- a/publisher/sns/sns.go +++ b/publisher/sns/sns.go @@ -9,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" + "github.com/google/uuid" ) // sender is the interface to sns.SNS. Its sole purpose is to make @@ -94,7 +95,9 @@ func (p *Publisher) PublishBatch(ctx context.Context, msgs []interface{}) error return err } + entryId := uuid.New().String() requestEntry := &sns.PublishBatchRequestEntry{ + Id: aws.String(entryId), Message: aws.String(string(b)), }