Skip to content

Commit

Permalink
feat!: specify group and pass the name
Browse files Browse the repository at this point in the history
  • Loading branch information
brokeyourbike committed Nov 20, 2022
1 parent 1660130 commit bb7dab6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 27 deletions.
19 changes: 15 additions & 4 deletions q/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ type Message interface {
// uniqueKeyKey is the key used to store the unique ID in the message attributes.
const uniqueKeyKey = "uniqueKey"

// topicKey is the key used to store the topic ID in the message attributes.
const topicKey = "topicKey"
// nameKey is the key used to store the name in the message attributes.
const nameKey = "nameKey"

// groupKey is the key used to store the group ID in the message attributes.
const groupKey = "groupKey"

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
Expand All @@ -39,8 +42,16 @@ func (m *PubSubMessage) GetUniqueKey() string {
return raw
}

func (m *PubSubMessage) GetTopicKey() string {
raw, ok := m.Message.Attributes[topicKey]
func (m *PubSubMessage) GetName() string {
raw, ok := m.Message.Attributes[nameKey]
if !ok {
return ""
}
return raw
}

func (m *PubSubMessage) GetGroup() string {
raw, ok := m.Message.Attributes[groupKey]
if !ok {
return ""
}
Expand Down
22 changes: 11 additions & 11 deletions q/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ type TaskOptionType int

const (
UniqueKeyOpt TaskOptionType = iota
GroupOpt
)

type PubSubOptionType int

const (
OrderedKeyOpt PubSubOptionType = iota
OrderedByTaskNameOpt
TopicOpt
)

type CloudTasksOptionType int
Expand Down Expand Up @@ -61,7 +61,7 @@ type CloudTasksOption interface {
// Internal option representations.
type (
uniqueKeyOption string
topicOption string
groupOption string
orderedByTaskNameOption bool
orderedKeyOption string
processAtOption time.Time
Expand All @@ -76,6 +76,15 @@ func (key uniqueKeyOption) String() string { return fmt.Sprintf("UniqueKey
func (key uniqueKeyOption) Type() TaskOptionType { return UniqueKeyOpt }
func (key uniqueKeyOption) Value() interface{} { return string(key) }

// Group returns an option to specify the group.
func Group(key string) TaskOption {
return groupOption(key)
}

func (key groupOption) String() string { return fmt.Sprintf("Group(%q)", string(key)) }
func (key groupOption) Type() TaskOptionType { return GroupOpt }
func (key groupOption) Value() interface{} { return string(key) }

// Ordered returns an option to specify the ordered key.
func OrderedByTaskName() PubSubOption {
return orderedByTaskNameOption(true)
Expand All @@ -94,15 +103,6 @@ func (key orderedKeyOption) String() string { return fmt.Sprintf("Ordere
func (key orderedKeyOption) Type() PubSubOptionType { return OrderedKeyOpt }
func (key orderedKeyOption) Value() interface{} { return string(key) }

// Topic returns an option to specify the unique key.
func Topic(key string) PubSubOption {
return topicOption(key)
}

func (key topicOption) String() string { return fmt.Sprintf("Topic(%q)", string(key)) }
func (key topicOption) Type() PubSubOptionType { return TopicOpt }
func (key topicOption) Value() interface{} { return string(key) }

// ProcessAt returns an option to specify when to process the given task.
func ProcessAt(t time.Time) CloudTasksOption {
return processAtOption(t)
Expand Down
22 changes: 10 additions & 12 deletions q/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,34 @@ func (q *PubSubQ) Enqueue(ctx context.Context, task *Task, opts ...PubSubOption)

message := &pubsub.Message{
Data: task.payload,
Attributes: map[string]string{},
Attributes: map[string]string{nameKey: task.typename},
}

for _, opt := range task.opts {
switch opt := opt.(type) {
case uniqueKeyOption:
message.Attributes[uniqueKeyKey] = string(opt)
case groupOption:
topicID = string(opt)
}
}

message.Attributes[groupKey] = topicID

topic := q.client.Topic(topicID)
defer topic.Stop()

for _, opt := range opts {
switch opt := opt.(type) {
case orderedKeyOption:
topic.EnableMessageOrdering = true
message.OrderingKey = string(opt)
case orderedByTaskNameOption:
topic.EnableMessageOrdering = true
message.OrderingKey = task.typename
case topicOption:
topicID = string(opt)
}
}

message.Attributes[topicKey] = topicID

topic := q.client.Topic(topicID)
defer topic.Stop()

// no harm in always enabling this feature
// since it depends if OrderingKey is empty or not.
topic.EnableMessageOrdering = true

result := topic.Publish(ctx, message)

messageID, err := result.Get(ctx)
Expand Down

0 comments on commit bb7dab6

Please sign in to comment.