diff --git a/q/message.go b/q/message.go index 103c7d0..37e4cb6 100644 --- a/q/message.go +++ b/q/message.go @@ -14,8 +14,11 @@ type Message interface { // uniqueKeyKey is the key used to store the unique ID in the message attributes. const uniqueKeyKey = "uniqueKey" -// topicKey is the key used to store the topic ID in the message attributes. -const topicKey = "topicKey" +// nameKey is the key used to store the name in the message attributes. +const nameKey = "nameKey" + +// groupKey is the key used to store the group ID in the message attributes. +const groupKey = "groupKey" // PubSubMessage is the payload of a Pub/Sub event. // See the documentation for more details: @@ -39,8 +42,16 @@ func (m *PubSubMessage) GetUniqueKey() string { return raw } -func (m *PubSubMessage) GetTopicKey() string { - raw, ok := m.Message.Attributes[topicKey] +func (m *PubSubMessage) GetName() string { + raw, ok := m.Message.Attributes[nameKey] + if !ok { + return "" + } + return raw +} + +func (m *PubSubMessage) GetGroup() string { + raw, ok := m.Message.Attributes[groupKey] if !ok { return "" } diff --git a/q/option.go b/q/option.go index 7913c64..082d9c6 100644 --- a/q/option.go +++ b/q/option.go @@ -9,6 +9,7 @@ type TaskOptionType int const ( UniqueKeyOpt TaskOptionType = iota + GroupOpt ) type PubSubOptionType int @@ -16,7 +17,6 @@ type PubSubOptionType int const ( OrderedKeyOpt PubSubOptionType = iota OrderedByTaskNameOpt - TopicOpt ) type CloudTasksOptionType int @@ -61,7 +61,7 @@ type CloudTasksOption interface { // Internal option representations. type ( uniqueKeyOption string - topicOption string + groupOption string orderedByTaskNameOption bool orderedKeyOption string processAtOption time.Time @@ -76,6 +76,15 @@ func (key uniqueKeyOption) String() string { return fmt.Sprintf("UniqueKey func (key uniqueKeyOption) Type() TaskOptionType { return UniqueKeyOpt } func (key uniqueKeyOption) Value() interface{} { return string(key) } +// Group returns an option to specify the group. +func Group(key string) TaskOption { + return groupOption(key) +} + +func (key groupOption) String() string { return fmt.Sprintf("Group(%q)", string(key)) } +func (key groupOption) Type() TaskOptionType { return GroupOpt } +func (key groupOption) Value() interface{} { return string(key) } + // Ordered returns an option to specify the ordered key. func OrderedByTaskName() PubSubOption { return orderedByTaskNameOption(true) @@ -94,15 +103,6 @@ func (key orderedKeyOption) String() string { return fmt.Sprintf("Ordere func (key orderedKeyOption) Type() PubSubOptionType { return OrderedKeyOpt } func (key orderedKeyOption) Value() interface{} { return string(key) } -// Topic returns an option to specify the unique key. -func Topic(key string) PubSubOption { - return topicOption(key) -} - -func (key topicOption) String() string { return fmt.Sprintf("Topic(%q)", string(key)) } -func (key topicOption) Type() PubSubOptionType { return TopicOpt } -func (key topicOption) Value() interface{} { return string(key) } - // ProcessAt returns an option to specify when to process the given task. func ProcessAt(t time.Time) CloudTasksOption { return processAtOption(t) diff --git a/q/pubsub.go b/q/pubsub.go index 1e7231b..e91524f 100644 --- a/q/pubsub.go +++ b/q/pubsub.go @@ -25,36 +25,34 @@ func (q *PubSubQ) Enqueue(ctx context.Context, task *Task, opts ...PubSubOption) message := &pubsub.Message{ Data: task.payload, - Attributes: map[string]string{}, + Attributes: map[string]string{nameKey: task.typename}, } for _, opt := range task.opts { switch opt := opt.(type) { case uniqueKeyOption: message.Attributes[uniqueKeyKey] = string(opt) + case groupOption: + topicID = string(opt) } } + message.Attributes[groupKey] = topicID + + topic := q.client.Topic(topicID) + defer topic.Stop() + for _, opt := range opts { switch opt := opt.(type) { case orderedKeyOption: + topic.EnableMessageOrdering = true message.OrderingKey = string(opt) case orderedByTaskNameOption: + topic.EnableMessageOrdering = true message.OrderingKey = task.typename - case topicOption: - topicID = string(opt) } } - message.Attributes[topicKey] = topicID - - topic := q.client.Topic(topicID) - defer topic.Stop() - - // no harm in always enabling this feature - // since it depends if OrderingKey is empty or not. - topic.EnableMessageOrdering = true - result := topic.Publish(ctx, message) messageID, err := result.Get(ctx)