Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 37 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,60 +1,58 @@
<picture>
<source media="(prefers-color-scheme: dark)" srcset="./docs/squeue-white.png">
<source media="(prefers-color-scheme: light)" srcset="./docs/squeue-black.png">
<img alt="S-queue mascotte" src="./docs/squeue-black.png">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="./docs/squeue-white.png">
<source media="(prefers-color-scheme: light)" srcset="./docs/squeue-black.png">
<img alt="S-queue mascot" src="./docs/squeue-black.png">
</picture>

**s-queue** is a library to interact with Amazon SQS queues with 100% type safety, powered by golang generics.

**squeue** is a Golang library designed to facilitate interactions with Amazon SQS queues, providing 100% type-safety powered by Golang generics.
---

[![codecov](https://codecov.io/github/simodima/squeue/graph/badge.svg?token=DW7C57P2VW)](https://codecov.io/github/simodima/squeue)
[![Go Report Card](https://goreportcard.com/badge/github.com/simodima/squeue)](https://goreportcard.com/report/github.com/simodima/squeue)
[![codecov](https://codecov.io/github/simodima/squeue/graph/badge.svg?token=DW7C57P2VW)](https://codecov.io/github/simodima/squeue)
[![Go Report Card](https://goreportcard.com/badge/github.com/simodima/squeue)](https://goreportcard.com/report/github.com/simodima/squeue)
[![Go Reference](https://pkg.go.dev/badge/github.com/simodima/squeue.svg)](https://pkg.go.dev/github.com/simodima/squeue)

The library provides the following features:
## Key Features

### Driver based implementation
The interface exposed by the library abstract the internal driver implementation, that allows to plug various different drivers depending of the development needs and the environment.
### Driver-based Implementation
squeue abstracts the internal driver implementation through its interface, allowing for seamless integration of various drivers to suit different development environments and needs.

The currently implemented drivers are :
- In Memory
Currently implemented drivers:
- In-Memory
- Amazon SQS

You can easly use the *Amazon SQS* driver in any staging/production environment and use the *In Memory* one for a lightweight testing environment.
You can easily deploy the *Amazon SQS* driver in any staging or production environment and use the *In-Memory* driver for lightweight testing.

### Type Safety
The library uses an opinionated JSON format to serialize/deserialize the messages. To be 100% typesafe you just need to provide a message type implementing the golang `json.Marshaler` and `json.Unmarshaler` interfaces; in that way the raw data serialization/deserialization is in your direct control.
The library uses an opinionated JSON format for serializing and deserializing messages. To achieve 100% type safety, simply provide a message type that implements the Golang `json.Marshaler` and `json.Unmarshaler` interfaces. This way, you maintain direct control over the raw data serialization and deserialization.

### Channel of messages
Consuming message is as easy as looping a channel of messages
### Channel of Messages
Consuming messages is straightforward—just loop through a channel of messages:

```golang
messages, _ := sub.Consume(ctx, "queue")
for m := range messages {
go func(message squeue.Message[*my.Message]) {
// Do the magic...
}(m)
}
messages, _ := sub.Consume(ctx, "queue")
for m := range messages {
go func(message squeue.Message[*my.Message]) {
// Do your magic...
}(m)
}
```

<details>
<summary>Examples</summary>

<summary>Examples</summary>

For a more clear documentation look at the `internal/examples/` directory
For more detailed documentation, please refer to the `internal/examples/` directory.

**In Memory driver**
```bash
go run internal/examples/memory/main.go
```

**Amazon SQS driver**
```bash
go run internal/examples/sqs/consumer/consumer.go
**In-Memory Driver**
```bash
go run internal/examples/memory/main.go
```

## in a different shell ↓
go run internal/examples/sqs/producer/producer.go
```
**Amazon SQS Driver**
```bash
go run internal/examples/sqs/consumer/consumer.go
# Open another shell
go run internal/examples/sqs/producer/producer.go
```

</details>
</details>
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (p *Consumer[T]) Consume(opts ...func(message any)) (chan Message[T], error
return outMsg, nil
}

// Ping runs a ping on the underlying driver
func (p Consumer[T]) Ping() error {
return p.driver.Ping()
}

func (p *Consumer[T]) Stop() {
if p.controller != nil {
p.controller.Stop()
Expand Down
13 changes: 13 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ func (suite *ConsumerTestSuite) TestNewConsumer() {
squeue.NewConsumer[*TestMessage](suite.driver, "test-queue")
}

func (suite *ConsumerTestSuite) TestPing() {
queue := "test-queue"
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)

suite.driver.
EXPECT().
Ping().
Return(errors.New("consume error"))

err := consumer.Ping()
suite.Error(err)
}

func (suite *ConsumerTestSuite) TestConsumeMessages_DriverError() {
queue := "test-queue"
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
Expand Down
1 change: 1 addition & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ type Driver interface {
Enqueue(queue string, data []byte, opts ...func(message any)) error
Consume(queue string, opts ...func(message any)) (*ConsumerController, error)
Ack(queue string, messageID string) error
Ping() error
}
4 changes: 4 additions & 0 deletions driver/memdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,7 @@ func (d *MemoryDriver) pop(queue string) *Message {

return nil
}

func (d *MemoryDriver) Ping() error {
return nil
}
5 changes: 5 additions & 0 deletions driver/memdriver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (suite *MemoryTestSuite) TestAckDONotBreakAnything() {
suite.Nil(d.Ack("test", "1"))
}

func (suite *MemoryTestSuite) TestPingDONotBreakAnything() {
d := driver.NewMemoryDriver(time.Millisecond)
suite.Nil(d.Ping())
}

func (suite *MemoryTestSuite) TestEnqueueDequeueSuccess() {
d := driver.NewMemoryDriver(time.Millisecond)

Expand Down
14 changes: 14 additions & 0 deletions driver_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion internal/examples/sqs/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"

"github.com/joho/godotenv"
"github.com/simodima/squeue"
Expand Down Expand Up @@ -61,12 +62,24 @@ func main() {

messages, err := sub.Consume(
sqs.WithConsumeMaxNumberOfMessages(10),
sqs.WithConsumeWaitTimeSeconds(20),
sqs.WithConsumeWaitTimeSeconds(2),
)
if err != nil {
panic(err)
}

// Ping the sqs to check connectivity with AWS
go func() {
for {
<-time.After(time.Second * 10)
if err := sub.Ping(); err != nil {
log.Println("Ping failed: " + err.Error())
} else {
log.Println("Ping OK")
}
}
}()

log.Print("Waiting for consuming")
wg := &sync.WaitGroup{}

Expand Down
14 changes: 7 additions & 7 deletions sqs/mocks/sqsclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type sqsClient interface {
DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error)
SendMessage(input *sqs.SendMessageInput) (*sqs.SendMessageOutput, error)
ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error)
ListQueues(input *sqs.ListQueuesInput) (*sqs.ListQueuesOutput, error)
GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error)
}

type Driver struct {
Expand Down Expand Up @@ -51,7 +51,7 @@ func New(options ...Option) (*Driver, error) {
}

if driver.testConnectionOnStartup {
if err := driver.testConnection(); err != nil {
if err := driver.Ping(); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -87,8 +87,3 @@ func createClient(queueUrl string, region string, clientCredentials *credentials

return sqs.New(session.Must(session.NewSessionWithOptions(options))), nil
}

func (d *Driver) testConnection() error {
_, err := d.sqsClient.ListQueues(&sqs.ListQueuesInput{})
return err
}
14 changes: 14 additions & 0 deletions sqs/sqs_ping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sqs

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)

func (d *Driver) Ping() error {
_, err := d.sqsClient.GetQueueAttributes(&sqs.GetQueueAttributesInput{
AttributeNames: []*string{aws.String("All")},
QueueUrl: aws.String(d.url),
})
return err
}
24 changes: 21 additions & 3 deletions sqs/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ func (suite *SQSTestSuite) TestNewWithDefaultOptions() {
suite.Contains(err.Error(), "missing")
}

func (suite *SQSTestSuite) TestNew_InvalidQueueURL() {
os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "/a/file")
_, err := sqs.New(
sqs.WithUrl("-"),
)

suite.Error(err)
suite.Contains(err.Error(), "invalid URI")
}

func (suite *SQSTestSuite) TestNewWithAClient() {
sqsDriver, err := sqs.New(sqs.WithClient(suite.sqsMock))

Expand All @@ -68,12 +78,16 @@ func (suite *SQSTestSuite) TestNewWithAClient() {
func (suite *SQSTestSuite) TestNewAutoTestConnectionSuccess() {
suite.sqsMock.
EXPECT().
ListQueues(&awssqs.ListQueuesInput{}).
Return(nil, nil)
GetQueueAttributes(&awssqs.GetQueueAttributesInput{
AttributeNames: []*string{aws.String("All")},
QueueUrl: aws.String("aws-sqs-queue-url"),
}).
Return(&awssqs.GetQueueAttributesOutput{}, nil)

sqsDriver, err := sqs.New(
sqs.WithClient(suite.sqsMock),
sqs.AutoTestConnection(),
sqs.WithUrl("aws-sqs-queue-url"),
)

suite.Nil(err)
Expand All @@ -83,12 +97,16 @@ func (suite *SQSTestSuite) TestNewAutoTestConnectionSuccess() {
func (suite *SQSTestSuite) TestNewAutoTestConnectionFail() {
suite.sqsMock.
EXPECT().
ListQueues(&awssqs.ListQueuesInput{}).
GetQueueAttributes(&awssqs.GetQueueAttributesInput{
AttributeNames: []*string{aws.String("All")},
QueueUrl: aws.String("aws-sqs-queue-url"),
}).
Return(nil, errors.New("error calling aws"))

sqsDriver, err := sqs.New(
sqs.WithClient(suite.sqsMock),
sqs.AutoTestConnection(),
sqs.WithUrl("aws-sqs-queue-url"),
)

suite.NotNil(err)
Expand Down