diff --git a/README.md b/README.md index fc372a8..1594763 100644 --- a/README.md +++ b/README.md @@ -1,60 +1,58 @@ - - - - S-queue mascotte + + + + S-queue mascot -**s-queue** is a library to interact with Amazon SQS queues with 100% type safety, powered by golang generics. - +**squeue** is a Golang library designed to facilitate interactions with Amazon SQS queues, providing 100% type-safety powered by Golang generics. + --- - -[![codecov](https://codecov.io/github/simodima/squeue/graph/badge.svg?token=DW7C57P2VW)](https://codecov.io/github/simodima/squeue) -[![Go Report Card](https://goreportcard.com/badge/github.com/simodima/squeue)](https://goreportcard.com/report/github.com/simodima/squeue) + +[![codecov](https://codecov.io/github/simodima/squeue/graph/badge.svg?token=DW7C57P2VW)](https://codecov.io/github/simodima/squeue) +[![Go Report Card](https://goreportcard.com/badge/github.com/simodima/squeue)](https://goreportcard.com/report/github.com/simodima/squeue) [![Go Reference](https://pkg.go.dev/badge/github.com/simodima/squeue.svg)](https://pkg.go.dev/github.com/simodima/squeue) -The library provides the following features: +## Key Features -### Driver based implementation -The interface exposed by the library abstract the internal driver implementation, that allows to plug various different drivers depending of the development needs and the environment. +### Driver-based Implementation +squeue abstracts the internal driver implementation through its interface, allowing for seamless integration of various drivers to suit different development environments and needs. -The currently implemented drivers are : -- In Memory +Currently implemented drivers: +- In-Memory - Amazon SQS -You can easly use the *Amazon SQS* driver in any staging/production environment and use the *In Memory* one for a lightweight testing environment. +You can easily deploy the *Amazon SQS* driver in any staging or production environment and use the *In-Memory* driver for lightweight testing. ### Type Safety -The library uses an opinionated JSON format to serialize/deserialize the messages. To be 100% typesafe you just need to provide a message type implementing the golang `json.Marshaler` and `json.Unmarshaler` interfaces; in that way the raw data serialization/deserialization is in your direct control. +The library uses an opinionated JSON format for serializing and deserializing messages. To achieve 100% type safety, simply provide a message type that implements the Golang `json.Marshaler` and `json.Unmarshaler` interfaces. This way, you maintain direct control over the raw data serialization and deserialization. -### Channel of messages -Consuming message is as easy as looping a channel of messages +### Channel of Messages +Consuming messages is straightforward—just loop through a channel of messages: ```golang - messages, _ := sub.Consume(ctx, "queue") - for m := range messages { - go func(message squeue.Message[*my.Message]) { - // Do the magic... - }(m) - } +messages, _ := sub.Consume(ctx, "queue") +for m := range messages { + go func(message squeue.Message[*my.Message]) { + // Do your magic... + }(m) +} ```
+ Examples -Examples - -For a more clear documentation look at the `internal/examples/` directory + For more detailed documentation, please refer to the `internal/examples/` directory. -**In Memory driver** -```bash -go run internal/examples/memory/main.go -``` - -**Amazon SQS driver** -```bash -go run internal/examples/sqs/consumer/consumer.go + **In-Memory Driver** + ```bash + go run internal/examples/memory/main.go + ``` -## in a different shell ↓ -go run internal/examples/sqs/producer/producer.go -``` + **Amazon SQS Driver** + ```bash + go run internal/examples/sqs/consumer/consumer.go + # Open another shell + go run internal/examples/sqs/producer/producer.go + ``` -
+ \ No newline at end of file diff --git a/consumer.go b/consumer.go index 739e7d4..906244f 100644 --- a/consumer.go +++ b/consumer.go @@ -65,6 +65,11 @@ func (p *Consumer[T]) Consume(opts ...func(message any)) (chan Message[T], error return outMsg, nil } +// Ping runs a ping on the underlying driver +func (p Consumer[T]) Ping() error { + return p.driver.Ping() +} + func (p *Consumer[T]) Stop() { if p.controller != nil { p.controller.Stop() diff --git a/consumer_test.go b/consumer_test.go index 91a8196..76c288f 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -33,6 +33,19 @@ func (suite *ConsumerTestSuite) TestNewConsumer() { squeue.NewConsumer[*TestMessage](suite.driver, "test-queue") } +func (suite *ConsumerTestSuite) TestPing() { + queue := "test-queue" + consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue) + + suite.driver. + EXPECT(). + Ping(). + Return(errors.New("consume error")) + + err := consumer.Ping() + suite.Error(err) +} + func (suite *ConsumerTestSuite) TestConsumeMessages_DriverError() { queue := "test-queue" consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue) diff --git a/driver/driver.go b/driver/driver.go index a8254eb..f20c55d 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -33,4 +33,5 @@ type Driver interface { Enqueue(queue string, data []byte, opts ...func(message any)) error Consume(queue string, opts ...func(message any)) (*ConsumerController, error) Ack(queue string, messageID string) error + Ping() error } diff --git a/driver/memdriver.go b/driver/memdriver.go index eec43ec..ac417d2 100644 --- a/driver/memdriver.go +++ b/driver/memdriver.go @@ -76,3 +76,7 @@ func (d *MemoryDriver) pop(queue string) *Message { return nil } + +func (d *MemoryDriver) Ping() error { + return nil +} diff --git a/driver/memdriver_test.go b/driver/memdriver_test.go index 6e479c4..e7c636c 100644 --- a/driver/memdriver_test.go +++ b/driver/memdriver_test.go @@ -29,6 +29,11 @@ func (suite *MemoryTestSuite) TestAckDONotBreakAnything() { suite.Nil(d.Ack("test", "1")) } +func (suite *MemoryTestSuite) TestPingDONotBreakAnything() { + d := driver.NewMemoryDriver(time.Millisecond) + suite.Nil(d.Ping()) +} + func (suite *MemoryTestSuite) TestEnqueueDequeueSuccess() { d := driver.NewMemoryDriver(time.Millisecond) diff --git a/driver_test.go b/driver_test.go index 77b7707..c0c95c1 100644 --- a/driver_test.go +++ b/driver_test.go @@ -86,3 +86,17 @@ func (mr *MockDriverMockRecorder) Enqueue(queue, data interface{}, opts ...inter varargs := append([]interface{}{queue, data}, opts...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enqueue", reflect.TypeOf((*MockDriver)(nil).Enqueue), varargs...) } + +// Ping mocks base method. +func (m *MockDriver) Ping() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ping") + ret0, _ := ret[0].(error) + return ret0 +} + +// Ping indicates an expected call of Ping. +func (mr *MockDriverMockRecorder) Ping() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockDriver)(nil).Ping)) +} diff --git a/internal/examples/sqs/consumer/consumer.go b/internal/examples/sqs/consumer/consumer.go index 220a2c7..f63bacf 100644 --- a/internal/examples/sqs/consumer/consumer.go +++ b/internal/examples/sqs/consumer/consumer.go @@ -6,6 +6,7 @@ import ( "os/signal" "sync" "syscall" + "time" "github.com/joho/godotenv" "github.com/simodima/squeue" @@ -61,12 +62,24 @@ func main() { messages, err := sub.Consume( sqs.WithConsumeMaxNumberOfMessages(10), - sqs.WithConsumeWaitTimeSeconds(20), + sqs.WithConsumeWaitTimeSeconds(2), ) if err != nil { panic(err) } + // Ping the sqs to check connectivity with AWS + go func() { + for { + <-time.After(time.Second * 10) + if err := sub.Ping(); err != nil { + log.Println("Ping failed: " + err.Error()) + } else { + log.Println("Ping OK") + } + } + }() + log.Print("Waiting for consuming") wg := &sync.WaitGroup{} diff --git a/sqs/mocks/sqsclient.go b/sqs/mocks/sqsclient.go index da02463..5915ab0 100644 --- a/sqs/mocks/sqsclient.go +++ b/sqs/mocks/sqsclient.go @@ -49,19 +49,19 @@ func (mr *MocksqsClientMockRecorder) DeleteMessage(input interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMessage", reflect.TypeOf((*MocksqsClient)(nil).DeleteMessage), input) } -// ListQueues mocks base method. -func (m *MocksqsClient) ListQueues(input *sqs.ListQueuesInput) (*sqs.ListQueuesOutput, error) { +// GetQueueAttributes mocks base method. +func (m *MocksqsClient) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListQueues", input) - ret0, _ := ret[0].(*sqs.ListQueuesOutput) + ret := m.ctrl.Call(m, "GetQueueAttributes", input) + ret0, _ := ret[0].(*sqs.GetQueueAttributesOutput) ret1, _ := ret[1].(error) return ret0, ret1 } -// ListQueues indicates an expected call of ListQueues. -func (mr *MocksqsClientMockRecorder) ListQueues(input interface{}) *gomock.Call { +// GetQueueAttributes indicates an expected call of GetQueueAttributes. +func (mr *MocksqsClientMockRecorder) GetQueueAttributes(input interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListQueues", reflect.TypeOf((*MocksqsClient)(nil).ListQueues), input) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MocksqsClient)(nil).GetQueueAttributes), input) } // ReceiveMessage mocks base method. diff --git a/sqs/sqs.go b/sqs/sqs.go index b159822..de2afdd 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -17,7 +17,7 @@ type sqsClient interface { DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) SendMessage(input *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) - ListQueues(input *sqs.ListQueuesInput) (*sqs.ListQueuesOutput, error) + GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error) } type Driver struct { @@ -51,7 +51,7 @@ func New(options ...Option) (*Driver, error) { } if driver.testConnectionOnStartup { - if err := driver.testConnection(); err != nil { + if err := driver.Ping(); err != nil { return nil, err } } @@ -87,8 +87,3 @@ func createClient(queueUrl string, region string, clientCredentials *credentials return sqs.New(session.Must(session.NewSessionWithOptions(options))), nil } - -func (d *Driver) testConnection() error { - _, err := d.sqsClient.ListQueues(&sqs.ListQueuesInput{}) - return err -} diff --git a/sqs/sqs_ping.go b/sqs/sqs_ping.go new file mode 100644 index 0000000..f2424fa --- /dev/null +++ b/sqs/sqs_ping.go @@ -0,0 +1,14 @@ +package sqs + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" +) + +func (d *Driver) Ping() error { + _, err := d.sqsClient.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + AttributeNames: []*string{aws.String("All")}, + QueueUrl: aws.String(d.url), + }) + return err +} diff --git a/sqs/sqs_test.go b/sqs/sqs_test.go index 183e46b..7adf42f 100644 --- a/sqs/sqs_test.go +++ b/sqs/sqs_test.go @@ -58,6 +58,16 @@ func (suite *SQSTestSuite) TestNewWithDefaultOptions() { suite.Contains(err.Error(), "missing") } +func (suite *SQSTestSuite) TestNew_InvalidQueueURL() { + os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "/a/file") + _, err := sqs.New( + sqs.WithUrl("-"), + ) + + suite.Error(err) + suite.Contains(err.Error(), "invalid URI") +} + func (suite *SQSTestSuite) TestNewWithAClient() { sqsDriver, err := sqs.New(sqs.WithClient(suite.sqsMock)) @@ -68,12 +78,16 @@ func (suite *SQSTestSuite) TestNewWithAClient() { func (suite *SQSTestSuite) TestNewAutoTestConnectionSuccess() { suite.sqsMock. EXPECT(). - ListQueues(&awssqs.ListQueuesInput{}). - Return(nil, nil) + GetQueueAttributes(&awssqs.GetQueueAttributesInput{ + AttributeNames: []*string{aws.String("All")}, + QueueUrl: aws.String("aws-sqs-queue-url"), + }). + Return(&awssqs.GetQueueAttributesOutput{}, nil) sqsDriver, err := sqs.New( sqs.WithClient(suite.sqsMock), sqs.AutoTestConnection(), + sqs.WithUrl("aws-sqs-queue-url"), ) suite.Nil(err) @@ -83,12 +97,16 @@ func (suite *SQSTestSuite) TestNewAutoTestConnectionSuccess() { func (suite *SQSTestSuite) TestNewAutoTestConnectionFail() { suite.sqsMock. EXPECT(). - ListQueues(&awssqs.ListQueuesInput{}). + GetQueueAttributes(&awssqs.GetQueueAttributesInput{ + AttributeNames: []*string{aws.String("All")}, + QueueUrl: aws.String("aws-sqs-queue-url"), + }). Return(nil, errors.New("error calling aws")) sqsDriver, err := sqs.New( sqs.WithClient(suite.sqsMock), sqs.AutoTestConnection(), + sqs.WithUrl("aws-sqs-queue-url"), ) suite.NotNil(err)