diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java index 6674dd26..2a390f53 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java @@ -21,7 +21,11 @@ import java.util.Map; import java.util.function.BiFunction; import java.util.stream.Collectors; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.spring.support.RocketMQMessageListenerContainerRegistrar; +import org.springframework.aop.framework.AopProxyUtils; +import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; @@ -29,10 +33,13 @@ import org.springframework.context.SmartLifecycle; import org.springframework.core.OrderComparator; import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.util.ClassUtils; public class RocketMQMessageListenerBeanPostProcessor implements BeanPostProcessor, SmartLifecycle { - private AnnotationEnhancer enhancer; + private static final Logger log = LoggerFactory.getLogger(RocketMQMessageListenerBeanPostProcessor.class); + + private final AnnotationEnhancer enhancer; private final ObjectProvider registrarObjectProvider; @@ -61,15 +68,76 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - Class targetClass = AopUtils.getTargetClass(bean); - RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class); + Class targetClass = AopProxyUtils.ultimateTargetClass(bean); + + RocketMQMessageListener ann = AnnotationUtils.findAnnotation(targetClass, RocketMQMessageListener.class); + if (ann == null) { + ann = AnnotationUtils.findAnnotation(bean.getClass(), RocketMQMessageListener.class); + } + if (ann != null) { RocketMQMessageListener enhance = enhance(targetClass, ann); registrarObjectProvider.ifAvailable(registrar -> registrar.registerContainer(beanName, bean, enhance)); + return bean; } + + boolean isAopProxy = AopUtils.isAopProxy(bean); + boolean isScopedTarget = ScopedProxyUtils.isScopedTarget(beanName); + boolean hasRefreshScope = hasRefreshScopeAnnotation(bean.getClass(), targetClass); + boolean likelyListener = isLikelyListenerBean(bean.getClass(), targetClass); + + if (likelyListener && (isAopProxy || isScopedTarget || hasRefreshScope)) { + log.warn("[RocketMQ] Bean '{}' (class={}) appears to be proxied or annotated with @RefreshScope. " + + "@RocketMQMessageListener on proxied beans may not be detected or registered. " + + "Recommended: do NOT put @RefreshScope on listener classes. " + + "Instead, extract refreshable configs to a separate @RefreshScope bean and inject it. " + + "See: https://github.com/apache/rocketmq/issues/9564", + beanName, bean.getClass().getName()); + } + return bean; } + private boolean hasRefreshScopeAnnotation(Class proxyClass, Class targetClass) { + try { + if (ClassUtils.isPresent("org.springframework.cloud.context.config.annotation.RefreshScope", + proxyClass.getClassLoader())) { + Class refreshScopeClass = ClassUtils.forName( + "org.springframework.cloud.context.config.annotation.RefreshScope", + proxyClass.getClassLoader()); + return AnnotationUtils.findAnnotation(targetClass, (Class)refreshScopeClass) != null + || AnnotationUtils.findAnnotation(proxyClass, (Class)refreshScopeClass) != null; + } + } + catch (Throwable ignored) { + // ignore + } + return false; + } + + private boolean isLikelyListenerBean(Class proxyClass, Class targetClass) { + try { + Class listener = ClassUtils.forName( + "org.apache.rocketmq.spring.core.RocketMQListener", proxyClass.getClassLoader()); + if (listener.isAssignableFrom(proxyClass) || listener.isAssignableFrom(targetClass)) { + return true; + } + } + catch (Throwable ignored) { + } + try { + Class replyListener = ClassUtils.forName( + "org.apache.rocketmq.spring.core.RocketMQReplyListener", proxyClass.getClassLoader()); + if (replyListener.isAssignableFrom(proxyClass) || replyListener.isAssignableFrom(targetClass)) { + return true; + } + } + catch (Throwable ignored) { + } + + return false; + } + @Override public int getPhase() { return Integer.MAX_VALUE - 2000; @@ -88,15 +156,15 @@ public void stop() { } - public void setRunning(boolean running) { - this.running = running; - } - @Override public boolean isRunning() { return running; } + public void setRunning(boolean running) { + this.running = running; + } + private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) { if (this.enhancer == null) { return ann; diff --git a/rocketmq-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/rocketmq-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index c61a5eb4..d5a51541 100644 --- a/rocketmq-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/rocketmq-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration \ No newline at end of file diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessorTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessorTest.java index 4bddd578..90919263 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessorTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessorTest.java @@ -25,6 +25,8 @@ import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; +import org.springframework.context.annotation.ScopedProxyMode; import static org.assertj.core.api.Assertions.assertThat; @@ -34,26 +36,35 @@ public class RocketMQMessageListenerBeanPostProcessorTest { private static final String TEST_CLASS_SIMPLE_NAME = "Receiver"; private ApplicationContextRunner runner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(RocketMQAutoConfiguration.class)); + .withConfiguration(AutoConfigurations.of(RocketMQAutoConfiguration.class)); @Test public void testAnnotationEnhancer() { runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876"). - withUserConfiguration(TestAnnotationEnhancerConfig.class, TestReceiverConfig.class). - run((context) -> { - // Started container failed. DefaultRocketMQListenerContainer{consumerGroup='Receiver-Custom-Consumer-Group' ** - assertThat(context).getFailure().hasMessageContaining("connect to null failed"); - }); + withUserConfiguration(TestAnnotationEnhancerConfig.class, TestReceiverConfig.class). + run((context) -> { + // Started container failed. DefaultRocketMQListenerContainer{consumerGroup='Receiver-Custom-Consumer-Group' ** + assertThat(context).getFailure().hasMessageContaining("connect to null failed"); + }); } + @Test + public void testProxiedListenerAnnotationDetected() { + runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876") + .withUserConfiguration(TestProxyConfig.class) + .run((context) -> { + assertThat(context).getFailure().hasMessageContaining("connect to"); + }); + } + @Configuration static class TestAnnotationEnhancerConfig { @Bean public RocketMQMessageListenerBeanPostProcessor.AnnotationEnhancer consumeContainerEnhancer() { return (attrs, element) -> { if (element instanceof Class) { - Class targetClass = (Class) element; + Class targetClass = (Class)element; String classSimpleName = targetClass.getSimpleName(); if (TEST_CLASS_SIMPLE_NAME.equals(classSimpleName)) { String consumerGroup = "Receiver-Custom-Consumer-Group"; @@ -81,4 +92,20 @@ public void onMessage(Object message) { } } + + @Configuration + static class TestProxyConfig { + @Bean + @Scope(proxyMode = ScopedProxyMode.TARGET_CLASS) + public Object proxiedReceiverListener() { + return new ProxiedReceiver(); + } + } + + @RocketMQMessageListener(consumerGroup = "proxy-group", topic = "test-proxy") + static class ProxiedReceiver implements RocketMQListener { + @Override + public void onMessage(Object message) { + } + } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java index 53d5cabd..e7624f50 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java @@ -16,7 +16,16 @@ */ package org.apache.rocketmq.client.annotation; +import java.lang.reflect.AnnotatedElement; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; import org.apache.rocketmq.client.autoconfigure.ListenerContainerConfiguration; +import org.apache.rocketmq.shaded.org.slf4j.Logger; +import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; +import org.springframework.aop.framework.AopProxyUtils; +import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; @@ -26,15 +35,12 @@ import org.springframework.context.SmartLifecycle; import org.springframework.core.OrderComparator; import org.springframework.core.annotation.AnnotationUtils; - -import java.lang.reflect.AnnotatedElement; -import java.util.List; -import java.util.Map; -import java.util.function.BiFunction; -import java.util.stream.Collectors; +import org.springframework.util.ClassUtils; public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle { + private static final Logger log = LoggerFactory.getLogger(RocketMQMessageListenerBeanPostProcessor.class); + private ApplicationContext applicationContext; private AnnotationEnhancer enhancer; @@ -50,17 +56,78 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - Class targetClass = AopUtils.getTargetClass(bean); - RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class); + Class targetClass = AopProxyUtils.ultimateTargetClass(bean); + + RocketMQMessageListener ann = AnnotationUtils.findAnnotation(targetClass, RocketMQMessageListener.class); + if (ann == null) { + ann = AnnotationUtils.findAnnotation(bean.getClass(), RocketMQMessageListener.class); + } + if (ann != null) { RocketMQMessageListener enhance = enhance(targetClass, ann); if (listenerContainerConfiguration != null) { listenerContainerConfiguration.registerContainer(beanName, bean, enhance); } + return bean; + } + + boolean isAopProxy = AopUtils.isAopProxy(bean); + boolean isScopedTarget = ScopedProxyUtils.isScopedTarget(beanName); + boolean hasRefreshScope = hasRefreshScopeAnnotation(bean.getClass(), targetClass); + boolean likelyListener = isLikelyListenerBean(bean.getClass(), targetClass); + + if (likelyListener && (isAopProxy || isScopedTarget || hasRefreshScope)) { + log.warn("[RocketMQ] Bean '{}' (class={}) appears to be proxied or annotated with @RefreshScope. " + + "@RocketMQMessageListener on proxied beans may not be detected or registered. " + + "Recommended: do NOT put @RefreshScope on listener classes. " + + "Instead, extract refreshable configs to a separate @RefreshScope bean and inject it. " + + "See: https://github.com/apache/rocketmq/issues/9564", + beanName, bean.getClass().getName()); } + return bean; } + private boolean hasRefreshScopeAnnotation(Class proxyClass, Class targetClass) { + try { + if (ClassUtils.isPresent("org.springframework.cloud.context.config.annotation.RefreshScope", + proxyClass.getClassLoader())) { + Class refreshScopeClass = ClassUtils.forName( + "org.springframework.cloud.context.config.annotation.RefreshScope", + proxyClass.getClassLoader()); + return AnnotationUtils.findAnnotation(targetClass, (Class)refreshScopeClass) != null + || AnnotationUtils.findAnnotation(proxyClass, (Class)refreshScopeClass) != null; + } + } + catch (Throwable ignored) { + // ignore + } + return false; + } + + private boolean isLikelyListenerBean(Class proxyClass, Class targetClass) { + try { + Class listener = ClassUtils.forName( + "org.apache.rocketmq.spring.core.RocketMQListener", proxyClass.getClassLoader()); + if (listener.isAssignableFrom(proxyClass) || listener.isAssignableFrom(targetClass)) { + return true; + } + } + catch (Throwable ignored) { + } + try { + Class replyListener = ClassUtils.forName( + "org.apache.rocketmq.spring.core.RocketMQReplyListener", proxyClass.getClassLoader()); + if (replyListener.isAssignableFrom(proxyClass) || replyListener.isAssignableFrom(targetClass)) { + return true; + } + } + catch (Throwable ignored) { + } + + return false; + } + @Override public int getPhase() { return Integer.MAX_VALUE - 2000; @@ -79,16 +146,15 @@ public void stop() { } - public void setRunning(boolean running) { - this.running = running; - } - - @Override public boolean isRunning() { return running; } + public void setRunning(boolean running) { + this.running = running; + } + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; @@ -103,12 +169,12 @@ public void afterPropertiesSet() throws Exception { private void buildEnhancer() { if (this.applicationContext != null) { Map enhancersMap = - this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false); + this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false); if (enhancersMap.size() > 0) { List enhancers = enhancersMap.values() - .stream() - .sorted(new OrderComparator()) - .collect(Collectors.toList()); + .stream() + .sorted(new OrderComparator()) + .collect(Collectors.toList()); this.enhancer = (attrs, element) -> { Map newAttrs = attrs; for (AnnotationEnhancer enh : enhancers) { @@ -123,9 +189,10 @@ private void buildEnhancer() { private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) { if (this.enhancer == null) { return ann; - } else { + } + else { return AnnotationUtils.synthesizeAnnotation( - this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null); + this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null); } } diff --git a/rocketmq-v5-client-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/rocketmq-v5-client-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 4dd19c2d..1abba53c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/rocketmq-v5-client-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. org.apache.rocketmq.client.autoconfigure.RocketMQAutoConfiguration \ No newline at end of file