Skip to content

Supporting more properties for rabbitmq #3806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ed37e34
Add support for messageId, correlationId, and type in RabbitMQ bindings
Feb 5, 2025
76ad9bb
Squashed commit of the following:
Apr 29, 2025
7bc8e52
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
abossard May 2, 2025
c6c92e1
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
abossard May 7, 2025
d498240
refactor(rabbitmq): improve metadata validation in tests and enhance …
May 2, 2025
58fd68a
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
abossard May 12, 2025
80d02bb
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
abossard May 19, 2025
c0c5a0d
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
cicoyle May 22, 2025
f1f96b9
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
abossard May 23, 2025
b15b6cb
fix(rabbitmq): standardize metadata keys to lowercase and enhance deb…
May 30, 2025
734a45e
Merge branch 'dapr:main' into supporting_more_properties_for_rabbitmq…
abossard May 30, 2025
4ce6b46
fix(rabbitmq): update metadata keys to include 'metadata.' prefix for…
May 30, 2025
8b02c14
fix(rabbitmq): prefix custom headers in metadata with 'metadata.' for…
May 30, 2025
bc34115
feat(rabbitmq): add PublishMessagePropertiesToMetadata flag to contro…
May 30, 2025
2a504c4
fix(rabbitmq): add missing metadata validation logging in TestRabbitM…
May 30, 2025
a221a07
fix(rabbitmq): remove outdated comment regarding gRPC protocol in Tes…
May 30, 2025
4215d96
feat(rabbitmq): implement TryGetProperty function with case-insensiti…
Jun 3, 2025
92c925a
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
cicoyle Jun 20, 2025
90ff6cd
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 20, 2025
1eaa8c9
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 25, 2025
33b022c
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 25, 2025
9f4fcaf
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 26, 2025
f41d0f3
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 26, 2025
cd7042b
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 26, 2025
872f744
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 26, 2025
e3d009e
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jun 26, 2025
8159433
Merge branch 'main' into supporting_more_properties_for_rabbitmq_on_main
dapr-bot Jul 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions bindings/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
amqp "github.com/rabbitmq/amqp091-go"

"github.com/dapr/components-contrib/bindings"
common "github.com/dapr/components-contrib/common/component/rabbitmq"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
kitmd "github.com/dapr/kit/metadata"
@@ -228,11 +229,6 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi
pub.Headers[k] = v
}

contentType, ok := metadata.TryGetContentType(req.Metadata)
if ok {
pub.ContentType = contentType
}

// The default time to live has been set in the queue
// We allow overriding on each call, by setting a value in request metadata
ttl, ok, err := metadata.TryGetTTL(req.Metadata)
@@ -252,6 +248,8 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi
pub.Priority = priority
}

common.ApplyMetadataToPublishing(req.Metadata, &pub)

err = ch.PublishWithContext(ctx, "", r.metadata.QueueName, false, false, pub)
if err != nil {
return nil, fmt.Errorf("failed to publish message: %w", err)
@@ -473,9 +471,9 @@ func (r *RabbitMQ) handleMessage(ctx context.Context, handler bindings.Handler,
// Passthrough any custom metadata to the handler.
for k, v := range d.Headers {
if s, ok := v.(string); ok {
// Escape the key and value to ensure they are valid URL query parameters.
// Escape the key to ensure they are valid URL query parameters.
// This is necessary for them to be sent as HTTP Metadata.
metadata[url.QueryEscape(k)] = url.QueryEscape(s)
metadata[url.QueryEscape(k)] = s
}
}

64 changes: 64 additions & 0 deletions bindings/rabbitmq/rabbitmq_integration_test.go
Original file line number Diff line number Diff line change
@@ -447,3 +447,67 @@ func TestPublishWithHeaders(t *testing.T) {
// assert.Contains(t, msg.Header, "custom_header1")
// assert.Contains(t, msg.Header, "custom_header2")
}

func TestPublishMetadataProperties(t *testing.T) {
rabbitmqHost := getTestRabbitMQHost()
require.NotEmpty(t, rabbitmqHost, fmt.Sprintf("RabbitMQ host configuration must be set in environment variable '%s'", testRabbitMQHostEnvKey))

queueName := uuid.New().String()
durable := true
exclusive := false

metadata := bindings.Metadata{
Base: contribMetadata.Base{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
},
},
}

logger := logger.NewLogger("test")
r := NewRabbitMQ(logger).(*RabbitMQ)
err := r.Init(t.Context(), metadata)
require.NoError(t, err)

conn, err := amqp.Dial(rabbitmqHost)
require.NoError(t, err)
defer conn.Close()

ch, err := conn.Channel()
require.NoError(t, err)
defer ch.Close()

const messageData = "test message"
const msgID = "msg-123"
const corrID = "corr-456"
const msgType = "testType"
const contentType = "application/json"

writeRequest := bindings.InvokeRequest{
Data: []byte(messageData),
Metadata: map[string]string{
"messageID": msgID,
"correlationID": corrID,
"type": msgType,
"contentType": contentType,
},
}
_, err = r.Invoke(t.Context(), &writeRequest)
require.NoError(t, err)

// Retrieve the message.
msg, ok, err := getMessageWithRetries(ch, queueName, 2*time.Second)
require.NoError(t, err)
assert.True(t, ok)
assert.Equal(t, messageData, string(msg.Body))
assert.Equal(t, msgID, msg.MessageId)
assert.Equal(t, corrID, msg.CorrelationId)
assert.Equal(t, msgType, msg.Type)
assert.Equal(t, contentType, msg.ContentType)

require.NoError(t, r.Close())
}
52 changes: 52 additions & 0 deletions common/component/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package rabbitmq

import (
"strings"

amqp "github.com/rabbitmq/amqp091-go"
)

const (
MetadataKeyMessageID = "messageID"
MetadataKeyCorrelationID = "correlationID"
MetadataKeyContentType = "contentType"
MetadataKeyType = "type"
MetadataKeyPriority = "priority"
MetadataKeyTTL = "ttl"
)

// TryGetProperty finds a property value using case-insensitive matching
func TryGetProperty(props map[string]string, key string) (string, bool) {
// First try exact match
if val, ok := props[key]; ok && val != "" {
return val, true
}

// Then try case-insensitive match
for k, v := range props {
if v != "" && strings.EqualFold(key, k) {
return v, true
}
}

return "", false
}

// ApplyMetadataToPublishing applies common metadata fields to an AMQP publishing
func ApplyMetadataToPublishing(metadata map[string]string, publishing *amqp.Publishing) {
if contentType, ok := TryGetProperty(metadata, MetadataKeyContentType); ok {
publishing.ContentType = contentType
}

if messageID, ok := TryGetProperty(metadata, MetadataKeyMessageID); ok {
publishing.MessageId = messageID
}

if correlationID, ok := TryGetProperty(metadata, MetadataKeyCorrelationID); ok {
publishing.CorrelationId = correlationID
}

if aType, ok := TryGetProperty(metadata, MetadataKeyType); ok {
publishing.Type = aType
}
}
68 changes: 68 additions & 0 deletions common/component/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package rabbitmq

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTryGetProperty(t *testing.T) {
tests := []struct {
name string
props map[string]string
key string
expected string
found bool
}{
{
name: "exact match",
props: map[string]string{"messageID": "test-id"},
key: "messageID",
expected: "test-id",
found: true,
},
{
name: "case insensitive match",
props: map[string]string{"messageid": "test-id"},
key: "messageID",
expected: "test-id",
found: true,
},
{
name: "uppercase match",
props: map[string]string{"MESSAGEID": "test-id"},
key: "messageID",
expected: "test-id",
found: true,
},
{
name: "not found",
props: map[string]string{"otherKey": "value"},
key: "messageID",
expected: "",
found: false,
},
{
name: "empty value",
props: map[string]string{"messageID": ""},
key: "messageID",
expected: "",
found: false,
},
{
name: "whitespace value",
props: map[string]string{"messageID": " "},
key: "messageID",
expected: " ",
found: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
value, found := TryGetProperty(tt.props, tt.key)
assert.Equal(t, tt.expected, value)
assert.Equal(t, tt.found, found)
})
}
}
2 changes: 1 addition & 1 deletion conversation/deepseek/metadata.yaml
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ status: alpha
title: "Deepseek"
urls:
- title: Reference
url: https://docs.dapr.io/reference/components-reference/supported-conversation/deepseek/
url: https://docs.dapr.io/reference/components-reference/supported-conversation/setup-deepseek/
authenticationProfiles:
- title: "API Key"
description: "Authenticate using an API key"
34 changes: 32 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -236,6 +236,8 @@ github.com/aliyunmq/mq-http-go-sdk v1.0.3/go.mod h1:JYfRMQoPexERvnNNBcal0ZQ2TVQ5
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc h1:NZRon3MDqT4vddR3UIRBnwbbhEerghAimCSBsiESs3g=
github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc/go.mod h1:cPJlbcHUTNTpiboMQjMHhE9XBni11LiBiG8FdrDuVzk=
@@ -382,13 +384,17 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chebyrash/promise v0.0.0-20230709133807-42ec49ba1459 h1:s7UrE2T8jRoriLIddT8fW5+Wf2sXcOgfteXUKD74SaU=
github.com/chebyrash/promise v0.0.0-20230709133807-42ec49ba1459/go.mod h1:CQthfPdCoGmlBJAG/sP9Km5nfK1/jGpDf1RiG/LUxXw=
github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog=
@@ -627,12 +633,16 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7DlmewI=
github.com/foxcpp/go-mockdns v1.1.0/go.mod h1:IhLeSFGed3mJIAXPH2aiRQB+kqz7oqu8ld2qVbOu7Wk=
github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7DlmewI=
github.com/foxcpp/go-mockdns v1.1.0/go.mod h1:IhLeSFGed3mJIAXPH2aiRQB+kqz7oqu8ld2qVbOu7Wk=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y=
github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
@@ -685,6 +695,8 @@ github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
@@ -1233,6 +1245,8 @@ github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM=
github.com/miekg/dns v1.1.57/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk=
github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM=
github.com/miekg/dns v1.1.57/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
@@ -1311,6 +1325,8 @@ github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
@@ -1541,6 +1557,8 @@ github.com/sendgrid/sendgrid-go v3.13.0+incompatible h1:HZrzc06/QfBGesY9o3n1lvBr
github.com/sendgrid/sendgrid-go v3.13.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shirou/gopsutil/v3 v3.22.2/go.mod h1:WapW1AOOPlHyXr+yOyw3uYx36enocrtSoSBy0L5vUHY=
@@ -1635,6 +1653,8 @@ github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxm
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/supplyon/gremcos v0.1.40 h1:OFJw3MV44HNE9N6SKYK0zRBbEwyugyyjjqeXiGi5E3w=
github.com/supplyon/gremcos v0.1.40/go.mod h1:LI6lxKObicSoIw1N04rHyjz9tGSaevM6Ydbo3XfyZfA=
github.com/tchap/go-patricia/v2 v2.3.2 h1:xTHFutuitO2zqKAQ5rCROYgUb7Or/+IC3fts9/Yc7nM=
@@ -1697,6 +1717,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.21.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A=
github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc=
github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc=
github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk=
@@ -1754,14 +1776,20 @@ go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r7
go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
go.etcd.io/etcd/api/v3 v3.5.10 h1:szRajuUUbLyppkhs9K6BRtjY37l66XQQmw7oZRANE4k=
go.etcd.io/etcd/api/v3 v3.5.10/go.mod h1:TidfmT4Uycad3NM/o25fG3J07odo4GBB9hoxaodFCtI=
go.etcd.io/etcd/api/v3 v3.5.10 h1:szRajuUUbLyppkhs9K6BRtjY37l66XQQmw7oZRANE4k=
go.etcd.io/etcd/api/v3 v3.5.10/go.mod h1:TidfmT4Uycad3NM/o25fG3J07odo4GBB9hoxaodFCtI=
go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/pkg/v3 v3.5.10 h1:kfYIdQftBnbAq8pUWFXfpuuxFSKzlmM5cSn76JByiT0=
go.etcd.io/etcd/client/pkg/v3 v3.5.10/go.mod h1:DYivfIviIuQ8+/lCq4vcxuseg2P2XbHygkKwFo9fc8U=
go.etcd.io/etcd/client/pkg/v3 v3.5.10 h1:kfYIdQftBnbAq8pUWFXfpuuxFSKzlmM5cSn76JByiT0=
go.etcd.io/etcd/client/pkg/v3 v3.5.10/go.mod h1:DYivfIviIuQ8+/lCq4vcxuseg2P2XbHygkKwFo9fc8U=
go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU=
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8=
go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY=
go.etcd.io/etcd/client/v3 v3.5.10 h1:W9TXNZ+oB3MCd/8UjxHTWK5J9Nquw9fQBLJd5ne5/Ao=
go.etcd.io/etcd/client/v3 v3.5.10/go.mod h1:RVeBnDz2PUEZqTpgqwAtUd8nAPf5kjyFyND7P1VkOKc=
go.etcd.io/etcd/client/v3 v3.5.10 h1:W9TXNZ+oB3MCd/8UjxHTWK5J9Nquw9fQBLJd5ne5/Ao=
go.etcd.io/etcd/client/v3 v3.5.10/go.mod h1:RVeBnDz2PUEZqTpgqwAtUd8nAPf5kjyFyND7P1VkOKc=
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY=
go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w=
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0/go.mod h1:tsKetYpt980ZTpzl/gb+UOJj9RkIyCb1u4wjzMg90BQ=
@@ -1846,6 +1874,8 @@ go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
@@ -2042,8 +2072,8 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
8 changes: 0 additions & 8 deletions metadata/utils.go
Original file line number Diff line number Diff line change
@@ -114,14 +114,6 @@ func IsRawPayload(props map[string]string) (bool, error) {
return false, nil
}

func TryGetContentType(props map[string]string) (string, bool) {
if val, ok := props[ContentType]; ok && val != "" {
return val, true
}

return "", false
}

func TryGetQueryIndexName(props map[string]string) (string, bool) {
if val, ok := props[QueryIndexName]; ok && val != "" {
return val, true
28 changes: 0 additions & 28 deletions metadata/utils_test.go
Original file line number Diff line number Diff line change
@@ -175,34 +175,6 @@ func TestIsRawPayload(t *testing.T) {
})
}

func TestTryGetContentType(t *testing.T) {
t.Run("Metadata without content type", func(t *testing.T) {
val, ok := TryGetContentType(map[string]string{})

assert.Equal(t, "", val)
assert.False(t, ok)
})

t.Run("Metadata with empty content type", func(t *testing.T) {
val, ok := TryGetContentType(map[string]string{
"contentType": "",
})

assert.Equal(t, "", val)
assert.False(t, ok)
})

t.Run("Metadata with corrent content type", func(t *testing.T) {
const contentType = "application/cloudevent+json"
val, ok := TryGetContentType(map[string]string{
"contentType": contentType,
})

assert.Equal(t, contentType, val)
assert.True(t, ok)
})
}

func TestMetadataStructToStringMap(t *testing.T) {
t.Run("Test metadata struct to metadata info conversion", func(t *testing.T) {
type NestedStruct struct {
107 changes: 55 additions & 52 deletions pubsub/rabbitmq/metadata.go
Original file line number Diff line number Diff line change
@@ -27,31 +27,32 @@ import (
)

type rabbitmqMetadata struct {
pubsub.TLSProperties `mapstructure:",squash"`
ConsumerID string `mapstructure:"consumerID" mdignore:"true"`
ConnectionString string `mapstructure:"connectionString"`
Protocol string `mapstructure:"protocol"`
internalProtocol string `mapstructure:"-"`
Hostname string `mapstructure:"hostname"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Durable bool `mapstructure:"durable"`
EnableDeadLetter bool `mapstructure:"enableDeadLetter"`
DeleteWhenUnused bool `mapstructure:"deletedWhenUnused"`
AutoAck bool `mapstructure:"autoAck"`
RequeueInFailure bool `mapstructure:"requeueInFailure"`
DeliveryMode uint8 `mapstructure:"deliveryMode"` // Transient (0 or 1) or Persistent (2)
PrefetchCount uint8 `mapstructure:"prefetchCount"` // Prefetch deactivated if 0
ReconnectWait time.Duration `mapstructure:"reconnectWaitSeconds"`
MaxLen int64 `mapstructure:"maxLen"`
MaxLenBytes int64 `mapstructure:"maxLenBytes"`
ExchangeKind string `mapstructure:"exchangeKind"`
ClientName string `mapstructure:"clientName"`
HeartBeat time.Duration `mapstructure:"heartBeat"`
PublisherConfirm bool `mapstructure:"publisherConfirm"`
SaslExternal bool `mapstructure:"saslExternal"`
Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"`
DefaultQueueTTL *time.Duration `mapstructure:"ttlInSeconds"`
pubsub.TLSProperties `mapstructure:",squash"`
ConsumerID string `mapstructure:"consumerID" mdignore:"true"`
ConnectionString string `mapstructure:"connectionString"`
Protocol string `mapstructure:"protocol"`
internalProtocol string `mapstructure:"-"`
Hostname string `mapstructure:"hostname"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Durable bool `mapstructure:"durable"`
EnableDeadLetter bool `mapstructure:"enableDeadLetter"`
DeleteWhenUnused bool `mapstructure:"deletedWhenUnused"`
AutoAck bool `mapstructure:"autoAck"`
RequeueInFailure bool `mapstructure:"requeueInFailure"`
DeliveryMode uint8 `mapstructure:"deliveryMode"` // Transient (0 or 1) or Persistent (2)
PrefetchCount uint8 `mapstructure:"prefetchCount"` // Prefetch deactivated if 0
ReconnectWait time.Duration `mapstructure:"reconnectWaitSeconds"`
MaxLen int64 `mapstructure:"maxLen"`
MaxLenBytes int64 `mapstructure:"maxLenBytes"`
ExchangeKind string `mapstructure:"exchangeKind"`
ClientName string `mapstructure:"clientName"`
HeartBeat time.Duration `mapstructure:"heartBeat"`
PublisherConfirm bool `mapstructure:"publisherConfirm"`
SaslExternal bool `mapstructure:"saslExternal"`
Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"`
DefaultQueueTTL *time.Duration `mapstructure:"ttlInSeconds"`
PublishMessagePropertiesToMetadata bool `mapstructure:"publishMessagePropertiesToMetadata"`
}

const (
@@ -65,23 +66,24 @@ const (
metadataUsernameKey = "username"
metadataPasswordKey = "password"

metadataDurableKey = "durable"
metadataEnableDeadLetterKey = "enableDeadLetter"
metadataDeleteWhenUnusedKey = "deletedWhenUnused"
metadataAutoAckKey = "autoAck"
metadataRequeueInFailureKey = "requeueInFailure"
metadataDeliveryModeKey = "deliveryMode"
metadataPrefetchCountKey = "prefetchCount"
metadataReconnectWaitSecondsKey = "reconnectWaitSeconds"
metadataMaxLenKey = "maxLen"
metadataMaxLenBytesKey = "maxLenBytes"
metadataExchangeKindKey = "exchangeKind"
metadataPublisherConfirmKey = "publisherConfirm"
metadataSaslExternal = "saslExternal"
metadataMaxPriority = "maxPriority"
metadataClientNameKey = "clientName"
metadataHeartBeatKey = "heartBeat"
metadataQueueNameKey = "queueName"
metadataDurableKey = "durable"
metadataEnableDeadLetterKey = "enableDeadLetter"
metadataDeleteWhenUnusedKey = "deletedWhenUnused"
metadataAutoAckKey = "autoAck"
metadataRequeueInFailureKey = "requeueInFailure"
metadataDeliveryModeKey = "deliveryMode"
metadataPrefetchCountKey = "prefetchCount"
metadataReconnectWaitSecondsKey = "reconnectWaitSeconds"
metadataMaxLenKey = "maxLen"
metadataMaxLenBytesKey = "maxLenBytes"
metadataExchangeKindKey = "exchangeKind"
metadataPublisherConfirmKey = "publisherConfirm"
metadataSaslExternal = "saslExternal"
metadataMaxPriority = "maxPriority"
metadataClientNameKey = "clientName"
metadataHeartBeatKey = "heartBeat"
metadataQueueNameKey = "queueName"
metadataPublishMessagePropertiesToMetadataKey = "publishMessagePropertiesToMetadata"

defaultReconnectWaitSeconds = 3

@@ -92,16 +94,17 @@ const (
// createMetadata creates a new instance from the pubsub metadata.
func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitmqMetadata, error) {
result := rabbitmqMetadata{
internalProtocol: protocolAMQP,
Hostname: "localhost",
Durable: true,
DeleteWhenUnused: true,
AutoAck: false,
ReconnectWait: time.Duration(defaultReconnectWaitSeconds) * time.Second,
ExchangeKind: fanoutExchangeKind,
PublisherConfirm: false,
SaslExternal: false,
HeartBeat: defaultHeartbeat,
internalProtocol: protocolAMQP,
Hostname: "localhost",
Durable: true,
DeleteWhenUnused: true,
AutoAck: false,
ReconnectWait: time.Duration(defaultReconnectWaitSeconds) * time.Second,
ExchangeKind: fanoutExchangeKind,
PublisherConfirm: false,
SaslExternal: false,
HeartBeat: defaultHeartbeat,
PublishMessagePropertiesToMetadata: false,
}

// upgrade metadata
1 change: 1 addition & 0 deletions pubsub/rabbitmq/metadata_test.go
Original file line number Diff line number Diff line change
@@ -99,6 +99,7 @@ func TestCreateMetadata(t *testing.T) {
assert.Equal(t, "", m.CACert)
assert.Equal(t, fanoutExchangeKind, m.ExchangeKind)
assert.True(t, m.Durable)
assert.False(t, m.PublishMessagePropertiesToMetadata)
})

invalidDeliveryModes := []string{"3", "10", "-1"}
52 changes: 50 additions & 2 deletions pubsub/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import (

amqp "github.com/rabbitmq/amqp091-go"

common "github.com/dapr/components-contrib/common/component/rabbitmq"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
@@ -259,6 +260,8 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest)
p.Priority = priority
}

common.ApplyMetadataToPublishing(req.Metadata, &p)

confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, p)
if err != nil {
r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err)
@@ -620,8 +623,13 @@ func (r *rabbitMQ) listenMessages(ctx context.Context, channel rabbitMQChannelBr

func (r *rabbitMQ) handleMessage(ctx context.Context, d amqp.Delivery, topic string, handler pubsub.Handler) error {
pubsubMsg := &pubsub.NewMessage{
Data: d.Body,
Topic: topic,
Data: d.Body,
Topic: topic,
Metadata: map[string]string{},
}

if r.metadata.PublishMessagePropertiesToMetadata {
pubsubMsg.Metadata = addAMQPPropertiesToMetadata(d)
}

err := handler(ctx, pubsubMsg)
@@ -745,3 +753,43 @@ func (r *rabbitMQ) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
func queueTypeValid(qType string) bool {
return qType == amqp.QueueTypeClassic || qType == amqp.QueueTypeQuorum
}

// Add this function to extract metadata from AMQP delivery
func addAMQPPropertiesToMetadata(delivery amqp.Delivery) map[string]string {
metadata := map[string]string{}

// Add message properties as metadata
if delivery.MessageId != "" {
metadata["metadata.messageid"] = delivery.MessageId
}

if delivery.CorrelationId != "" {
metadata["metadata.correlationid"] = delivery.CorrelationId
}

if delivery.Type != "" {
metadata["metadata.type"] = delivery.Type
}

if delivery.ContentType != "" {
metadata["metadata.contenttype"] = delivery.ContentType
}

// Add any custom headers
for k, v := range delivery.Headers {
metadataPrefixedKey := "metadata." + k
if v != nil {
switch value := v.(type) {
case string:
metadata[metadataPrefixedKey] = value
case []byte:
metadata[metadataPrefixedKey] = string(value)
default:
// Try to convert other types to string
metadata[metadataPrefixedKey] = fmt.Sprintf("%v", v)
}
}
}

return metadata
}
218 changes: 213 additions & 5 deletions pubsub/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
@@ -461,10 +461,11 @@ func createAMQPMessage(body []byte) amqp.Delivery {
}

type rabbitMQInMemoryBroker struct {
buffer chan amqp.Delivery
declaredQueues []string
connectCount atomic.Int32
closeCount atomic.Int32
buffer chan amqp.Delivery
declaredQueues []string
connectCount atomic.Int32
closeCount atomic.Int32
lastMsgMetadata *amqp.Publishing // Add this field to capture the last message metadata
}

func (r *rabbitMQInMemoryBroker) Qos(prefetchCount, prefetchSize int, global bool) error {
@@ -482,7 +483,17 @@ func (r *rabbitMQInMemoryBroker) PublishWithDeferredConfirmWithContext(ctx conte
return nil, errors.New(errorChannelConnection)
}

r.buffer <- createAMQPMessage(msg.Body)
// Store the last message metadata for inspection in tests
r.lastMsgMetadata = &msg

// Use a non-blocking send or a separate goroutine to prevent deadlock
// when there's no consumer reading from the buffer
select {
case r.buffer <- createAMQPMessage(msg.Body):
// Message sent successfully
default:
// Buffer is full or there's no consumer, but we don't want to block
}

return nil, nil
}
@@ -525,3 +536,200 @@ func (r *rabbitMQInMemoryBroker) Close() error {
func (r *rabbitMQInMemoryBroker) IsClosed() bool {
return r.connectCount.Load() <= r.closeCount.Load()
}

// TestPublishMetadataProperties tests that message metadata properties are correctly passed to the broker
func TestPublishMetadataProperties(t *testing.T) {
broker := newBroker()
pubsubRabbitMQ := newRabbitMQTest(broker)
metadata := pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
metadataHostnameKey: "anyhost",
metadataConsumerIDKey: "consumer",
},
}}
err := pubsubRabbitMQ.Init(t.Context(), metadata)
require.NoError(t, err)

topic := "metadatatest"

// Create a consumer for the test to prevent channel deadlock
messageHandler := func(ctx context.Context, msg *pubsub.NewMessage) error {
return nil
}
err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topic}, messageHandler)
require.NoError(t, err)

// Test messageID
err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{
Topic: topic,
Data: []byte("test message"),
Metadata: map[string]string{
"messageID": "msg-123",
},
})
require.NoError(t, err)
assert.Equal(t, "msg-123", broker.lastMsgMetadata.MessageId)

// Test correlationID
err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{
Topic: topic,
Data: []byte("test message"),
Metadata: map[string]string{
"correlationID": "corr-456",
},
})
require.NoError(t, err)
assert.Equal(t, "corr-456", broker.lastMsgMetadata.CorrelationId)

// Test Type
err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{
Topic: topic,
Data: []byte("test message"),
Metadata: map[string]string{
"type": "mytype",
},
})
require.NoError(t, err)
assert.Equal(t, "mytype", broker.lastMsgMetadata.Type)

// Test all properties together
err = pubsubRabbitMQ.Publish(t.Context(), &pubsub.PublishRequest{
Topic: topic,
Data: []byte("test message"),
Metadata: map[string]string{
"messageID": "msg-789",
"correlationID": "corr-789",
"type": "complete-type",
"contentType": "application/json",
},
})
require.NoError(t, err)
assert.Equal(t, "msg-789", broker.lastMsgMetadata.MessageId)
assert.Equal(t, "corr-789", broker.lastMsgMetadata.CorrelationId)
assert.Equal(t, "complete-type", broker.lastMsgMetadata.Type)
assert.Equal(t, "application/json", broker.lastMsgMetadata.ContentType)
}

func TestPublishMessagePropertiesToMetadataFlag(t *testing.T) {
topicName := "test-topic"
messageData := []byte("test message data")

t.Run("flag is true", func(t *testing.T) {
broker := newBroker()
pubsubRabbitMQ := newRabbitMQTest(broker)
metadata := pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
metadataHostnameKey: "anyhost",
metadataConsumerIDKey: "consumer",
metadataPublishMessagePropertiesToMetadataKey: "true",
},
}}
err := pubsubRabbitMQ.Init(t.Context(), metadata)
require.NoError(t, err)

var receivedMsg *pubsub.NewMessage
processed := make(chan bool)
handler := func(ctx context.Context, msg *pubsub.NewMessage) error {
receivedMsg = msg
processed <- true
return nil
}

err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topicName}, handler)
require.NoError(t, err)

// Publish a message with some AMQP properties
broker.buffer <- amqp.Delivery{
Body: messageData,
MessageId: "msg-id-true",
ContentType: "text/plain",
Headers: amqp.Table{
"customHeader": "customValue",
},
}

<-processed
require.NotNil(t, receivedMsg)
assert.Equal(t, messageData, receivedMsg.Data)
assert.Equal(t, topicName, receivedMsg.Topic)
assert.Equal(t, "msg-id-true", receivedMsg.Metadata["metadata.messageid"])
assert.Equal(t, "text/plain", receivedMsg.Metadata["metadata.contenttype"])
assert.Equal(t, "customValue", receivedMsg.Metadata["metadata.customHeader"])
})

t.Run("flag is false", func(t *testing.T) {
broker := newBroker()
pubsubRabbitMQ := newRabbitMQTest(broker)
metadata := pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
metadataHostnameKey: "anyhost",
metadataConsumerIDKey: "consumer",
metadataPublishMessagePropertiesToMetadataKey: "false", // Explicitly false
},
}}
err := pubsubRabbitMQ.Init(t.Context(), metadata)
require.NoError(t, err)

var receivedMsg *pubsub.NewMessage
processed := make(chan bool)
handler := func(ctx context.Context, msg *pubsub.NewMessage) error {
receivedMsg = msg
processed <- true
return nil
}

err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topicName}, handler)
require.NoError(t, err)

// Publish a message with some AMQP properties
broker.buffer <- amqp.Delivery{
Body: messageData,
MessageId: "msg-id-false",
ContentType: "application/xml",
}

<-processed
require.NotNil(t, receivedMsg)
assert.Equal(t, messageData, receivedMsg.Data)
assert.Equal(t, topicName, receivedMsg.Topic)
assert.Empty(t, receivedMsg.Metadata, "Metadata should be empty when flag is false")
})

t.Run("flag is not set (default to false)", func(t *testing.T) {
broker := newBroker()
pubsubRabbitMQ := newRabbitMQTest(broker)
metadata := pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
metadataHostnameKey: "anyhost",
metadataConsumerIDKey: "consumer",
// metadataPublishMessagePropertiesToMetadataKey is not set
},
}}
err := pubsubRabbitMQ.Init(t.Context(), metadata)
require.NoError(t, err)

var receivedMsg *pubsub.NewMessage
processed := make(chan bool)
handler := func(ctx context.Context, msg *pubsub.NewMessage) error {
receivedMsg = msg
processed <- true
return nil
}

err = pubsubRabbitMQ.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topicName}, handler)
require.NoError(t, err)

// Publish a message with some AMQP properties
broker.buffer <- amqp.Delivery{
Body: messageData,
MessageId: "msg-id-default",
ContentType: "application/json",
}

<-processed
require.NotNil(t, receivedMsg)
assert.Equal(t, messageData, receivedMsg.Data)
assert.Equal(t, topicName, receivedMsg.Topic)
assert.Empty(t, receivedMsg.Metadata, "Metadata should be empty when flag is not set (defaults to false)")
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: metadata-binding
spec:
type: bindings.rabbitmq
version: v1
metadata:
- name: queueName
value: metadataQueue
- name: host
value: "amqp://test:test@localhost:5672"
- name: durable
value: true
- name: deleteWhenUnused
value: false
96 changes: 96 additions & 0 deletions tests/certification/bindings/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (
"log"
"os"
"strconv"
"strings"
"testing"
"time"

@@ -614,6 +615,101 @@ func amqpMtlsExternalAuthReady(url string) flow.Runnable {
}
}

func getMetadataValueCI(metadata map[string]string, key string) (string, bool) {
for k, v := range metadata {
if strings.EqualFold(k, key) {
return v, true
}
}
return "", false
}

func TestRabbitMQMetadataProperties(t *testing.T) {
messages := watcher.NewUnordered()

ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]

// Define the test values for metadata with fixed IDs
const messageData = "metadata-test-message"
const msgID = "msg-id-123"
const corrID = "corr-id-456"
const msgType = "test-type"
const contentType = "application/json"

test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")

metadata := map[string]string{
"messageID": msgID,
"correlationID": corrID,
"type": msgType,
"contentType": contentType,
}

ctx.Log("Invoking binding with metadata properties!")
req := &daprClient.InvokeBindingRequest{
Name: "metadata-binding",
Operation: "create",
Data: []byte(messageData),
Metadata: metadata,
}

err = client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message with metadata")

// Assertion on the data and metadata.
messages.ExpectStrings(messageData)
messages.Assert(ctx, time.Minute)

return nil
}

application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoint.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("metadata-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
msg := string(in.Data)
messages.Observe(msg)

// Log the received metadata for debugging
ctx.Logf("Got message: %s with metadata: %+v", msg, in.Metadata)

msgIdVal, _ := getMetadataValueCI(in.Metadata, "messageid")
corrIdVal, _ := getMetadataValueCI(in.Metadata, "correlationid")
contentTypeVal, _ := getMetadataValueCI(in.Metadata, "contenttype")
typeVal, _ := getMetadataValueCI(in.Metadata, "type")

require.Equal(t, msgID, msgIdVal, "messageID should match expected value")
require.Equal(t, corrID, corrIdVal, "correlationID should match expected value")
require.Equal(t, contentType, contentTypeVal, "contentType should match expected value")
require.Equal(t, msgType, typeVal, "type should match expected value")

return []byte("{}"), nil
}))
return err
}

flow.New(t, "rabbitmq metadata properties certification").
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
Step(app.Run("metadataApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("metadataSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./components/metadata"),
)...,
)).
Step("send with metadata and verify", test).
Run()
}

func componentRuntimeOptions() []embedded.Option {
log := logger.NewLogger("dapr.components")

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mq-metadata
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: consumerID
value: metadata
- name: host
value: "amqp://test:test@localhost:5672"
- name: durable
value: true
- name: deletedWhenUnused
value: false
- name: requeueInFailure
value: true
- name: publishMessagePropertiesToMetadata
value: true
178 changes: 178 additions & 0 deletions tests/certification/pubsub/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
@@ -54,11 +55,13 @@ const (
sidecarName2 = "dapr-2"
sidecarName3 = "dapr-3"
sidecarName4 = "dapr-4"
sidecarNameMetadata = "dapr-metadata"
sidecarNameTTLClient = "dapr-ttl-client"
appID1 = "app-1"
appID2 = "app-2"
appID3 = "app-3"
appID4 = "app-4"
appIDMetadata = "app-metadata"
clusterName = "rabbitmqcertification"
dockerComposeYAML = "docker-compose.yml"
extSaslDockerComposeYAML = "mtls_sasl_external/docker-compose.yml"
@@ -72,6 +75,7 @@ const (
pubsubAlpha = "mq-alpha"
pubsubBeta = "mq-beta"
pubsubMtlsExternal = "mq-mtls"
pubsubMetadata = "mq-metadata"
pubsubMessageOnlyTTL = "msg-ttl-pubsub"
pubsubQueueOnlyTTL = "overwrite-ttl-pubsub"
pubsubOverwriteTTL = "queue-ttl-pubsub"
@@ -84,6 +88,8 @@ const (
topicTTL1 = "ttl1"
topicTTL2 = "ttl2"
topicTTL3 = "ttl3"

topicMetadata = "metadata-topic"
)

type Consumer struct {
@@ -853,6 +859,178 @@ func TestRabbitMQPriority(t *testing.T) {
Run()
}

func getMetadataValueCI(metadata map[string]string, key string) (string, bool) {
for k, v := range metadata {
if strings.EqualFold(k, key) {
return v, true
}
}
return "", false
}

func TestRabbitMQMetadataProperties(t *testing.T) {
messagesWatcher := watcher.NewUnordered()

// Define the test values for metadata with fixed IDs
const messageCount = 10
const msgID = "msg-id-123"
const corrID = "corr-id-456"
const msgType = "test-type"
const contentType = "application/json"

messages := make([]string, messageCount)
for i := range messageCount {
messages[i] = fmt.Sprintf("Test message %d", i+1)
}

// Use a channel to collect metadata validation errors
metadataErrors := make(chan error, 1)

// Application logic that tracks messages with their metadata
metadataApp := func(ctx flow.Context, s common.Service) (err error) {
// Setup the topic event handler for metadata testing
err = s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubMetadata,
Topic: topicMetadata,
Route: "/metadata",
Metadata: map[string]string{},
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
// ENHANCED DEBUGGING - Print every detail of the message
ctx.Logf("==================== MESSAGE RECEIVED ====================")
ctx.Logf("Topic: %s", e.Topic)
ctx.Logf("PubsubName: %s", e.PubsubName)
ctx.Logf("ID: %s", e.ID)
ctx.Logf("Data Type: %T", e.Data)
ctx.Logf("Data: %s", e.Data)
ctx.Logf("---------------- METADATA DUMP ----------------")
ctx.Logf("Total metadata entries: %d", len(e.Metadata))
ctx.Logf("Raw metadata map: %+v", e.Metadata)

ctx.Logf("---------------- TARGET METADATA VALUES ----------------")
msgIdVal, _ := getMetadataValueCI(e.Metadata, "messageid")
corrIdVal, _ := getMetadataValueCI(e.Metadata, "correlationid")
contentTypeVal, _ := getMetadataValueCI(e.Metadata, "contenttype")
typeVal, _ := getMetadataValueCI(e.Metadata, "type")

ctx.Logf(" → messageID: '%s' (expected: '%s')", msgIdVal, msgID)
ctx.Logf(" → correlationID: '%s' (expected: '%s')", corrIdVal, corrID)
ctx.Logf(" → contentType: '%s' (expected: '%s')", contentTypeVal, contentType)
ctx.Logf(" → type: '%s' (expected: '%s')", typeVal, msgType)

// Instead of failing silently, collect errors and send them to the channel
var metadataErr error

if msgIdVal != msgID {
ctx.Logf("ERROR: messageID not found or incorrect: value=%s", msgIdVal)
metadataErr = fmt.Errorf("expected messageID: %s, got: %s", msgID, msgIdVal)
}

if corrIdVal != corrID {
ctx.Logf("ERROR: correlationID not found or incorrect: value=%s", corrIdVal)
if metadataErr != nil {
metadataErr = fmt.Errorf("%w; expected correlationID: %s, got: %s",
metadataErr, corrID, corrIdVal)
} else {
metadataErr = fmt.Errorf("expected correlationID: %s, got: %s", corrID, corrIdVal)
}
}

if contentTypeVal != contentType {
ctx.Logf("ERROR: contentType not found or incorrect: value=%s", contentTypeVal)
if metadataErr != nil {
metadataErr = fmt.Errorf("%w; expected contentType: %s, got: %s",
metadataErr, contentType, contentTypeVal)
} else {
metadataErr = fmt.Errorf("expected contentType: %s, got: %s", contentType, contentTypeVal)
}
}

if typeVal != msgType {
ctx.Logf("ERROR: type not found or incorrect: value=%s", typeVal)
if metadataErr != nil {
metadataErr = fmt.Errorf("%w; expected type: %s, got: %s",
metadataErr, msgType, typeVal)
} else {
metadataErr = fmt.Errorf("expected type: %s, got: %s", msgType, typeVal)
}
}

dataStr, ok := e.Data.(string)
if !ok {
return false, fmt.Errorf("e.Data is not a string, got %T", e.Data)
}
// If there are any metadata errors, send them to the channel
if metadataErr != nil {
ctx.Logf("Metadata validation failed: %s", metadataErr)
metadataErrors <- metadataErr
}

messagesWatcher.Observe(dataStr)
ctx.Logf("Got message: %s with all expected metadata properties", e.Data)
return false, nil
})

return err
}

// Test function to publish messages with metadata
testMetadata := func(ctx flow.Context) error {
// Get the Dapr client
client := sidecar.GetClient(ctx, sidecarNameMetadata)
messagesWatcher.ExpectStrings(messages...)

// Publish messages with metadata properties
ctx.Log("Publishing messages with metadata properties")
for i := 0; i < messageCount; i++ {
err := client.PublishEvent(ctx, pubsubMetadata, topicMetadata, messages[i],
daprClient.PublishEventWithMetadata(map[string]string{
"messageID": msgID,
"correlationID": corrID,
"contentType": contentType,
"type": msgType,
}))
require.NoError(ctx, err, "Failed publishing message with metadata")
}

// Check for metadata errors with timeout
select {
case err := <-metadataErrors:
return fmt.Errorf("metadata validation failed: %w", err)
case <-time.After(5 * time.Second):
// No errors within timeout, continue with message assertion
}

// Verify all messages were processed correctly
ctx.Log("Verifying messages were received...")
messagesWatcher.Assert(t, 20*time.Second)

return nil
}

// Run the test flow
flow.New(t, "rabbitmq metadata properties pubsub certification").
// Start RabbitMQ container
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness", retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
// Run the metadata app and sidecar
Step(app.Run(appIDMetadata, fmt.Sprintf(":%d", appPort+10), metadataApp)).
Step(sidecar.Run(sidecarNameMetadata,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/metadata"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+10)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+20)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+10)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+10)),
embedded.WithGracefulShutdownDuration(2*time.Second),
)...,
)).
// Wait for subscription to complete
Step("wait for subscription setup", flow.Sleep(5*time.Second)).
// Run the test with timeout
Step("publish and verify metadata properties", testMetadata).
Run()
}

func componentRuntimeOptions() []embedded.Option {
log := logger.NewLogger("dapr.components")