Skip to content

Commit 079fe61

Browse files
authored
Merge pull request #49 from fangjian0423/header-conversion!
[ISSUE-41] Optimize header conversion
2 parents 27b3389 + 03b3ec9 commit 079fe61

File tree

3 files changed

+79
-30
lines changed

3 files changed

+79
-30
lines changed

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* Represents the RocketMQ message protocol that is used during the data exchange.
2121
*/
2222
public class RocketMQHeaders {
23+
public static final String PREFIX = "rocketmq_";
2324
public static final String KEYS = "KEYS";
2425
public static final String TAGS = "TAGS";
2526
public static final String TOPIC = "TOPIC";
@@ -30,5 +31,4 @@ public class RocketMQHeaders {
3031
public static final String QUEUE_ID = "QUEUE_ID";
3132
public static final String SYS_FLAG = "SYS_FLAG";
3233
public static final String TRANSACTION_ID = "TRANSACTION_ID";
33-
public static final String PROPERTIES = "PROPERTIES";
3434
}

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.rocketmq.client.producer.LocalTransactionState;
2424
import org.apache.rocketmq.client.producer.TransactionListener;
2525
import org.apache.rocketmq.common.message.Message;
26+
import org.apache.rocketmq.common.message.MessageConst;
2627
import org.apache.rocketmq.common.message.MessageExt;
2728
import org.apache.rocketmq.remoting.RPCHook;
2829
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
@@ -31,10 +32,12 @@
3132
import org.springframework.messaging.MessageHeaders;
3233
import org.springframework.messaging.MessagingException;
3334
import org.springframework.messaging.support.MessageBuilder;
35+
import org.springframework.util.CollectionUtils;
3436
import org.springframework.util.StringUtils;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
3739
import java.nio.charset.Charset;
40+
import java.util.Map;
3841
import java.util.Objects;
3942

4043
public class RocketMQUtil {
@@ -77,37 +80,48 @@ public static MessagingException convert(MQClientException e) {
7780

7881
public static org.springframework.messaging.Message convertToSpringMessage(
7982
org.apache.rocketmq.common.message.MessageExt message) {
80-
org.springframework.messaging.Message retMessage =
83+
MessageBuilder messageBuilder =
8184
MessageBuilder.withPayload(message.getBody()).
82-
setHeader(RocketMQHeaders.KEYS, message.getKeys()).
83-
setHeader(RocketMQHeaders.TAGS, message.getTags()).
84-
setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
85-
setHeader(RocketMQHeaders.MESSAGE_ID, message.getMsgId()).
86-
setHeader(RocketMQHeaders.BORN_TIMESTAMP, message.getBornTimestamp()).
87-
setHeader(RocketMQHeaders.BORN_HOST, message.getBornHostString()).
88-
setHeader(RocketMQHeaders.FLAG, message.getFlag()).
89-
setHeader(RocketMQHeaders.QUEUE_ID, message.getQueueId()).
90-
setHeader(RocketMQHeaders.SYS_FLAG, message.getSysFlag()).
91-
setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
92-
setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
93-
build();
94-
95-
return retMessage;
85+
setHeader(toRocketHeaderKey(RocketMQHeaders.KEYS), message.getKeys()).
86+
setHeader(toRocketHeaderKey(RocketMQHeaders.TAGS), message.getTags()).
87+
setHeader(toRocketHeaderKey(RocketMQHeaders.TOPIC), message.getTopic()).
88+
setHeader(toRocketHeaderKey(RocketMQHeaders.MESSAGE_ID), message.getMsgId()).
89+
setHeader(toRocketHeaderKey(RocketMQHeaders.BORN_TIMESTAMP), message.getBornTimestamp()).
90+
setHeader(toRocketHeaderKey(RocketMQHeaders.BORN_HOST), message.getBornHostString()).
91+
setHeader(toRocketHeaderKey(RocketMQHeaders.FLAG), message.getFlag()).
92+
setHeader(toRocketHeaderKey(RocketMQHeaders.QUEUE_ID), message.getQueueId()).
93+
setHeader(toRocketHeaderKey(RocketMQHeaders.SYS_FLAG), message.getSysFlag()).
94+
setHeader(toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID), message.getTransactionId());
95+
addUserProperties(message.getProperties(), messageBuilder);
96+
return messageBuilder.build();
97+
}
98+
99+
public static String toRocketHeaderKey(String rawKey) {
100+
return RocketMQHeaders.PREFIX + rawKey;
101+
}
102+
103+
private static void addUserProperties(Map<String, String> properties, MessageBuilder messageBuilder) {
104+
if (!CollectionUtils.isEmpty(properties)) {
105+
properties.forEach((key, val) -> {
106+
if (!MessageConst.STRING_HASH_SET.contains(key) && !MessageHeaders.ID.equals(key)
107+
&& !MessageHeaders.TIMESTAMP.equals(key)) {
108+
messageBuilder.setHeader(key, val);
109+
}
110+
});
111+
}
96112
}
97113

98114
public static org.springframework.messaging.Message convertToSpringMessage(
99115
org.apache.rocketmq.common.message.Message message) {
100-
org.springframework.messaging.Message retMessage =
116+
MessageBuilder messageBuilder =
101117
MessageBuilder.withPayload(message.getBody()).
102-
setHeader(RocketMQHeaders.KEYS, message.getKeys()).
103-
setHeader(RocketMQHeaders.TAGS, message.getTags()).
104-
setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
105-
setHeader(RocketMQHeaders.FLAG, message.getFlag()).
106-
setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
107-
setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
108-
build();
109-
110-
return retMessage;
118+
setHeader(toRocketHeaderKey(RocketMQHeaders.KEYS), message.getKeys()).
119+
setHeader(toRocketHeaderKey(RocketMQHeaders.TAGS), message.getTags()).
120+
setHeader(toRocketHeaderKey(RocketMQHeaders.TOPIC), message.getTopic()).
121+
setHeader(toRocketHeaderKey(RocketMQHeaders.FLAG), message.getFlag()).
122+
setHeader(toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID), message.getTransactionId());
123+
addUserProperties(message.getProperties(), messageBuilder);
124+
return messageBuilder.build();
111125
}
112126

113127
public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
@@ -160,11 +174,12 @@ public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
160174
rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
161175

162176
headers.entrySet().stream()
163-
.filter(entry -> !Objects.equals(entry.getKey(), RocketMQHeaders.KEYS)
164-
&& !Objects.equals(entry.getKey(), "FLAG")
165-
&& !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
177+
.filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
178+
&& !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
166179
.forEach(entry -> {
167-
rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
180+
if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
181+
rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
182+
}
168183
});
169184

170185
}

rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.springframework.messaging.Message;
2424
import org.springframework.messaging.support.MessageBuilder;
2525

26+
import static org.junit.Assert.assertEquals;
27+
import static org.junit.Assert.assertNull;
2628
import static org.junit.Assert.assertTrue;
2729

2830
public class RocketMQUtilTest {
@@ -56,4 +58,36 @@ public void testPayload() {
5658
assertTrue(Arrays.equals((byte[])msgWithBytePayload.getPayload(), rocketMsg2.getBody()));
5759
}
5860

61+
@Test
62+
public void testHeaderConvertToRMQMsg() {
63+
Message msgWithStringPayload = MessageBuilder.withPayload("test body")
64+
.setHeader("test", 1)
65+
.setHeader(RocketMQHeaders.TAGS, "tags")
66+
.setHeader(RocketMQHeaders.KEYS, "my_keys")
67+
.build();
68+
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
69+
"UTF-8", "test-topic", msgWithStringPayload);
70+
assertEquals(String.valueOf("1"), rocketMsg.getProperty("test"));
71+
assertNull(rocketMsg.getProperty(RocketMQHeaders.TAGS));
72+
assertEquals("my_keys", rocketMsg.getProperty(RocketMQHeaders.KEYS));
73+
}
74+
75+
@Test
76+
public void testHeaderConvertToSpringMsg() {
77+
org.apache.rocketmq.common.message.Message rmqMsg = new org.apache.rocketmq.common.message.Message();
78+
rmqMsg.setBody("test body".getBytes());
79+
rmqMsg.setTopic("test-topic");
80+
rmqMsg.putUserProperty("test", "1");
81+
rmqMsg.setTags("tags");
82+
Message springMsg = RocketMQUtil.convertToSpringMessage(rmqMsg);
83+
assertEquals(String.valueOf("1"), springMsg.getHeaders().get("test"));
84+
assertEquals("tags", springMsg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
85+
86+
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
87+
"UTF-8", "test-topic", springMsg);
88+
assertEquals(String.valueOf("1"), rocketMsg.getProperty("test"));
89+
assertEquals(String.valueOf("tags"), rocketMsg.getProperty(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
90+
assertNull(rocketMsg.getTags());
91+
}
92+
5993
}

0 commit comments

Comments
 (0)