From 30fb81a6de65162ccbba9f689bad0d29496ce0e8 Mon Sep 17 00:00:00 2001 From: cnScarb Date: Wed, 27 Oct 2021 09:16:27 +0800 Subject: [PATCH] fix rocketmq message header properties garbled characters issue (#54) --- CHANGES.md | 1 + .../rocketMQ/v4/MessageSendInterceptor.java | 5 +- .../v4/MessageSendInterceptorTest.java | 54 +++++++++++++++++-- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 46455b027f..bf03ed0a3e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -39,6 +39,7 @@ Release Notes. * Fix instrumentation v2 API doesn't work for constructor instrumentation. * Add plugin to support okhttp 2.x * Optimize okhttp 3.x 4.x plugin to get span time cost precisely +* Adapt message header properties of RocketMQ 4.9.x #### Documentation diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java index 50b8d19102..b27e438f6b 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java @@ -68,10 +68,13 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr while (next.hasNext()) { next = next.next(); if (!StringUtil.isEmpty(next.getHeadValue())) { + if (properties.length() > 0 && properties.charAt(properties.length() - 1) != PROPERTY_SEPARATOR) { + // adapt for RocketMQ 4.9.x or later + properties.append(PROPERTY_SEPARATOR); + } properties.append(next.getHeadKey()); properties.append(NAME_VALUE_SEPARATOR); properties.append(next.getHeadValue()); - properties.append(PROPERTY_SEPARATOR); } } requestHeader.setProperties(properties.toString()); diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java index e9a9f3a04a..3f7f395c58 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java @@ -19,9 +19,13 @@ package org.apache.skywalking.apm.plugin.rocketMQ.v4; import java.util.List; +import java.util.Map; + import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.skywalking.apm.agent.core.context.SW8ExtensionCarrierItem; import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; import org.junit.Before; import org.junit.Rule; @@ -44,10 +48,17 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.when; +import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; + @RunWith(PowerMockRunner.class) @PowerMockRunnerDelegate(TracingSegmentRunner.class) public class MessageSendInterceptorTest { @@ -108,8 +119,8 @@ public void setSkyWalkingDynamicField(Object value) { CommunicationMode.ASYNC, null }; - when(messageRequestHeader.getProperties()).thenReturn(""); when(message.getTags()).thenReturn("TagA"); + stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA" + PROPERTY_SEPARATOR); } @Test @@ -117,6 +128,13 @@ public void testSendMessage() throws Throwable { messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null); messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, null, null); + Map tags = MessageDecoder.string2messageProperties( + ((SendMessageRequestHeader) arguments[3]).getProperties()); + // check original header of TAGS + assertThat(tags.get("TAGS"), is("TagA")); + // check skywalking header + assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME)); + assertThat(segmentStorage.getTraceSegments().size(), is(1)); TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); List spans = SegmentHelper.getSpans(traceSegment); @@ -127,15 +145,27 @@ public void testSendMessage() throws Throwable { SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ_PRODUCER); SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); - verify(messageRequestHeader).setProperties(anyString()); verify(callBack).setSkyWalkingDynamicField(Matchers.any()); } + @Test + public void testSendMessageNew() throws Throwable { + stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA"); + testSendMessage(); + } + @Test public void testSendMessageWithoutCallBack() throws Throwable { messageSendInterceptor.beforeMethod(enhancedInstance, null, argumentsWithoutCallback, null, null); messageSendInterceptor.afterMethod(enhancedInstance, null, argumentsWithoutCallback, null, null); + Map tags = MessageDecoder.string2messageProperties( + ((SendMessageRequestHeader) argumentsWithoutCallback[3]).getProperties()); + // check original header of TAGS + assertThat(tags.get("TAGS"), is("TagA")); + // check skywalking header + assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME)); + assertThat(segmentStorage.getTraceSegments().size(), is(1)); TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); List spans = SegmentHelper.getSpans(traceSegment); @@ -146,7 +176,25 @@ public void testSendMessageWithoutCallBack() throws Throwable { SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ_PRODUCER); SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); - verify(messageRequestHeader).setProperties(anyString()); } + @Test + public void testSendMessageWithoutCallBackNew() throws Throwable { + stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA"); + testSendMessageWithoutCallBack(); + } + + private void stubMessageRequestHeader(String properties) { + messageRequestHeader = mock(SendMessageRequestHeader.class, RETURNS_DEEP_STUBS); + doAnswer(invocation -> { + String val = (String) invocation.getArguments()[0]; + when(messageRequestHeader.getProperties()).thenReturn(val); + return null; + }).when(messageRequestHeader).setProperties(anyString()); + when(messageRequestHeader.getProperties()).thenCallRealMethod(); + messageRequestHeader.setProperties(properties); + + arguments[3] = messageRequestHeader; + argumentsWithoutCallback[3] = messageRequestHeader; + } }