Skip to content

Commit 972dfc7

Browse files
authored
Merge pull request #1063 from yanmxa/br_mqtt_stru
Fix the `content-type` issue for the MQTT protocol
2 parents e8fccd2 + 5754cf9 commit 972dfc7

File tree

3 files changed

+15
-15
lines changed

3 files changed

+15
-15
lines changed

protocol/mqtt_paho/v2/message.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ import (
1717
)
1818

1919
const (
20-
prefix = "ce-"
21-
contentType = "Content-Type"
20+
prefix = "ce-"
2221
)
2322

2423
var specs = spec.WithPrefix(prefix)
@@ -41,8 +40,7 @@ func NewMessage(msg *paho.Publish) *Message {
4140
var f format.Format
4241
var v spec.Version
4342
if msg.Properties != nil {
44-
// Use properties.User["Content-type"] to determine if message is structured
45-
if s := msg.Properties.User.Get(contentType); format.IsFormat(s) {
43+
if s := msg.Properties.ContentType; format.IsFormat(s) {
4644
f = format.Lookup(s)
4745
} else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" {
4846
v = specs.Version(s)
@@ -88,14 +86,20 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
8886
} else {
8987
err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value)
9088
}
91-
} else if userProperty.Key == contentType {
92-
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value))
9389
}
9490
if err != nil {
9591
return
9692
}
9793
}
9894

95+
contentType := m.internal.Properties.ContentType
96+
if contentType != "" {
97+
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), contentType)
98+
if err != nil {
99+
return err
100+
}
101+
}
102+
99103
if m.internal.Payload != nil {
100104
return encoder.SetData(bytes.NewBuffer(m.internal.Payload))
101105
}

protocol/mqtt_paho/v2/message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestReadStructured(t *testing.T) {
3232
msg: &paho.Publish{
3333
Payload: []byte(""),
3434
Properties: &paho.PublishProperties{
35-
User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}},
35+
ContentType: event.ApplicationCloudEventsJSON,
3636
},
3737
},
3838
},

protocol/mqtt_paho/v2/write_message.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,9 @@ var (
4242

4343
func (b *pubMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error {
4444
if b.Properties == nil {
45-
b.Properties = &paho.PublishProperties{
46-
User: make([]paho.UserProperty, 0),
47-
}
45+
b.Properties = &paho.PublishProperties{}
4846
}
49-
b.Properties.User.Add(contentType, f.MediaType())
47+
b.Properties.ContentType = f.MediaType()
5048
var buf bytes.Buffer
5149
_, err := io.Copy(&buf, event)
5250
if err != nil {
@@ -85,15 +83,13 @@ func (b *pubMessageWriter) SetData(reader io.Reader) error {
8583
func (b *pubMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error {
8684
if attribute.Kind() == spec.DataContentType {
8785
if value == nil {
88-
b.removeProperty(contentType)
86+
b.Properties.ContentType = ""
8987
}
9088
s, err := types.Format(value)
9189
if err != nil {
9290
return err
9391
}
94-
if err := b.addProperty(contentType, s); err != nil {
95-
return err
96-
}
92+
b.Properties.ContentType = s
9793
} else {
9894
if value == nil {
9995
b.removeProperty(prefix + attribute.Name())

0 commit comments

Comments
 (0)