Skip to content

Commit

Permalink
Merge pull request #73 from glocurrency/ct
Browse files Browse the repository at this point in the history
feat: cloudtasks
  • Loading branch information
brokeyourbike authored Aug 17, 2024
2 parents 6d45db2 + 0e72077 commit 4e8449c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 21 deletions.
41 changes: 28 additions & 13 deletions q/cloudtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,43 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type Config interface {
GetProjectID() string
GetLocationID() string
GetBaseUrl() string
GetServiceAccountEmail() string
var ErrTaskIsNil = fmt.Errorf("task is nil")

type CloudTasksConfig struct {
ProjectID string
LocationID string
BaseUrl string
ServiceAccountEmail string
}

type CloudTasksQ interface {
// Enqueue enqueues a task to the ClousTasks queue.
Enqueue(ctx context.Context, task *Task, opts ...CloudTasksOption) (*TaskInfo, error)
// Enqueue enqueues a task with ClousTasks, and returns info.
EnqueueWithInfo(ctx context.Context, task *Task, opts ...CloudTasksOption) (*TaskInfo, error)
// Enqueue enqueues a task with ClousTasks.
Enqueue(ctx context.Context, task *Task, opts ...CloudTasksOption) error
}

type cloudTasksQ struct {
cfg Config
cfg CloudTasksConfig
client *cloudtasks.Client
}

func NewCloudTasksQ(cfg Config, client *cloudtasks.Client) *cloudTasksQ {
func NewCloudTasksQ(cfg CloudTasksConfig, client *cloudtasks.Client) *cloudTasksQ {
return &cloudTasksQ{cfg: cfg, client: client}
}

func (q *cloudTasksQ) Enqueue(ctx context.Context, task *Task, opts ...CloudTasksOption) (info *TaskInfo, err error) {
// Enqueue enqueues a task with ClousTasks.
func (q *cloudTasksQ) Enqueue(ctx context.Context, task *Task, opts ...CloudTasksOption) error {
_, err := q.EnqueueWithInfo(ctx, task, opts...)
return err
}

// Enqueue enqueues a task with ClousTasks, and returns info.
func (q *cloudTasksQ) EnqueueWithInfo(ctx context.Context, task *Task, opts ...CloudTasksOption) (info *TaskInfo, err error) {
if task == nil {
return nil, ErrTaskIsNil
}

queueID := task.typename
uniqueKey := ""

Expand All @@ -56,20 +71,20 @@ func (q *cloudTasksQ) Enqueue(ctx context.Context, task *Task, opts ...CloudTask
}
}

queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", q.cfg.GetProjectID(), q.cfg.GetLocationID(), queueID)
queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", q.cfg.ProjectID, q.cfg.LocationID, queueID)

req := &cloudtaskspb.CreateTaskRequest{
Parent: queuePath,
Task: &cloudtaskspb.Task{
MessageType: &cloudtaskspb.Task_HttpRequest{
HttpRequest: &cloudtaskspb.HttpRequest{
HttpMethod: cloudtaskspb.HttpMethod_POST,
Url: fmt.Sprintf("%s/%s", q.cfg.GetBaseUrl(), queueID),
Url: fmt.Sprintf("%s/%s", q.cfg.BaseUrl, queueID),
Body: payload,
Headers: map[string]string{"Content-Type": "application/json", nameKey: task.typename, groupKey: queueID},
AuthorizationHeader: &cloudtaskspb.HttpRequest_OidcToken{
OidcToken: &cloudtaskspb.OidcToken{
ServiceAccountEmail: q.cfg.GetServiceAccountEmail(),
ServiceAccountEmail: q.cfg.ServiceAccountEmail,
},
},
},
Expand Down
23 changes: 23 additions & 0 deletions q/cloudtasks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package q_test

import (
"context"
"testing"

cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
"github.com/glocurrency/commons/q"
"github.com/stretchr/testify/require"
)

func TestCloudTasksQ_Enqueue_Marshal(t *testing.T) {
cannotMarshall := make(chan int)

ps := q.NewCloudTasksQ(q.CloudTasksConfig{}, &cloudtasks.Client{})

err := ps.Enqueue(context.TODO(), q.NewTask("test", cannotMarshall))
require.Error(t, err)
require.ErrorContains(t, err, "failed to marshal payload")

err = ps.Enqueue(context.TODO(), nil)
require.ErrorIs(t, err, q.ErrTaskIsNil)
}
10 changes: 5 additions & 5 deletions q/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
)

type PubSubQ interface {
// Enqueue enqueues a task to the Pub/Sub queue, and returns info.
// Enqueue enqueues a task with Pub/Sub, and returns info.
EnqueueWithInfo(ctx context.Context, task *Task, opts ...PubSubOption) (*TaskInfo, error)
// Enqueue enqueues a task to the Pub/Sub queue.
// Enqueue enqueues a task with Pub/Sub.
Enqueue(ctx context.Context, task *Task, opts ...PubSubOption) error
}

Expand All @@ -23,16 +23,16 @@ func NewPubSubQ(client *pubsub.Client) *pubSubQ {
return &pubSubQ{client: client}
}

// Enqueue enqueues a task to the Pub/Sub queue.
// Enqueue enqueues a task with Pub/Sub.
func (q *pubSubQ) Enqueue(ctx context.Context, task *Task, opts ...PubSubOption) error {
_, err := q.EnqueueWithInfo(ctx, task, opts...)
return err
}

// Enqueue enqueues a task to the Pub/Sub queue, and returns info.
// Enqueue enqueues a task with Pub/Sub, and returns info.
func (q *pubSubQ) EnqueueWithInfo(ctx context.Context, task *Task, opts ...PubSubOption) (info *TaskInfo, err error) {
if task == nil {
return nil, fmt.Errorf("task is nil")
return nil, ErrTaskIsNil
}

topicID := task.typename
Expand Down
5 changes: 2 additions & 3 deletions q/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestEnqueue_Marshal(t *testing.T) {
func TestPubSubQ_Enqueue_Marshal(t *testing.T) {
cannotMarshall := make(chan int)

ps := q.NewPubSubQ(&pubsub.Client{})
Expand All @@ -19,6 +19,5 @@ func TestEnqueue_Marshal(t *testing.T) {
require.ErrorContains(t, err, "failed to marshal payload")

err = ps.Enqueue(context.TODO(), nil)
require.Error(t, err)
require.ErrorContains(t, err, "task is nil")
require.ErrorIs(t, err, q.ErrTaskIsNil)
}

0 comments on commit 4e8449c

Please sign in to comment.