2020import java .lang .reflect .Method ;
2121import java .lang .reflect .ParameterizedType ;
2222import java .lang .reflect .Type ;
23+ import java .lang .reflect .TypeVariable ;
2324import java .nio .charset .Charset ;
2425import java .util .List ;
2526import java .util .Objects ;
27+
2628import org .apache .rocketmq .client .AccessChannel ;
2729import org .apache .rocketmq .client .consumer .DefaultMQPushConsumer ;
2830import org .apache .rocketmq .client .consumer .MessageSelector ;
5860import org .springframework .context .ApplicationContextAware ;
5961import org .springframework .context .SmartLifecycle ;
6062import org .springframework .core .MethodParameter ;
63+ import org .springframework .core .ResolvableType ;
6164import org .springframework .messaging .Message ;
6265import org .springframework .messaging .MessageHeaders ;
6366import org .springframework .messaging .converter .MessageConversionException ;
6669import org .springframework .messaging .support .MessageBuilder ;
6770import org .springframework .util .Assert ;
6871import org .springframework .util .MimeTypeUtils ;
72+ import org .springframework .util .ReflectionUtils ;
6973
7074@ SuppressWarnings ("WeakerAccess" )
7175public class DefaultRocketMQListenerContainer implements InitializingBean ,
@@ -538,6 +542,8 @@ private Object doConvertMessage(MessageExt messageExt) {
538542 if (messageType instanceof Class ) {
539543 //if the messageType has not Generic Parameter
540544 return this .getMessageConverter ().fromMessage (MessageBuilder .withPayload (str ).build (), (Class <?>) messageType );
545+ } else if (messageType instanceof TypeVariable ) {
546+ return this .getMessageConverter ().fromMessage (MessageBuilder .withPayload (str ).build (), Object .class );
541547 } else {
542548 //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
543549 //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
@@ -553,61 +559,37 @@ private Object doConvertMessage(MessageExt messageExt) {
553559
554560 private MethodParameter getMethodParameter () {
555561 Class <?> targetClass ;
562+ Class <?> consumerInterface ;
556563 if (rocketMQListener != null ) {
557564 targetClass = AopProxyUtils .ultimateTargetClass (rocketMQListener );
565+ consumerInterface = RocketMQListener .class ;
558566 } else {
559567 targetClass = AopProxyUtils .ultimateTargetClass (rocketMQReplyListener );
568+ consumerInterface = RocketMQReplyListener .class ;
560569 }
561- Type messageType = this .getMessageType ();
562- Class clazz = null ;
563- if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter ) {
564- clazz = (Class ) ((ParameterizedType ) messageType ).getRawType ();
565- } else if (messageType instanceof Class ) {
566- clazz = (Class ) messageType ;
567- } else {
568- throw new RuntimeException ("parameterType:" + messageType + " of onMessage method is not supported" );
569- }
570- try {
571- final Method method = targetClass .getMethod ("onMessage" , clazz );
572- return new MethodParameter (method , 0 );
573- } catch (NoSuchMethodException e ) {
574- e .printStackTrace ();
575- throw new RuntimeException ("parameterType:" + messageType + " of onMessage method is not supported" );
576- }
570+ ResolvableType resolvableType = ResolvableType .forClass (targetClass ).as (consumerInterface );
571+ Class <?> methodParameterType = resolvableType .getGeneric ().resolve (Object .class );
572+ Method onMessage = ReflectionUtils .findMethod (targetClass , "onMessage" , methodParameterType );
573+ return MethodParameter .forExecutable (onMessage , 0 );
577574 }
578575
576+
579577 private Type getMessageType () {
580578 Class <?> targetClass ;
579+ Class <?> consumerInterface ;
581580 if (rocketMQListener != null ) {
582581 targetClass = AopProxyUtils .ultimateTargetClass (rocketMQListener );
582+ consumerInterface = RocketMQListener .class ;
583583 } else {
584584 targetClass = AopProxyUtils .ultimateTargetClass (rocketMQReplyListener );
585+ consumerInterface = RocketMQReplyListener .class ;
585586 }
586- Type matchedGenericInterface = null ;
587- while (Objects .nonNull (targetClass )) {
588- Type [] interfaces = targetClass .getGenericInterfaces ();
589- if (Objects .nonNull (interfaces )) {
590- for (Type type : interfaces ) {
591- if (type instanceof ParameterizedType &&
592- (Objects .equals (((ParameterizedType ) type ).getRawType (), RocketMQListener .class ) || Objects .equals (((ParameterizedType ) type ).getRawType (), RocketMQReplyListener .class ))) {
593- matchedGenericInterface = type ;
594- break ;
595- }
596- }
597- }
598- targetClass = targetClass .getSuperclass ();
599- }
600- if (Objects .isNull (matchedGenericInterface )) {
601- return Object .class ;
602- }
603-
604- Type [] actualTypeArguments = ((ParameterizedType ) matchedGenericInterface ).getActualTypeArguments ();
605- if (Objects .nonNull (actualTypeArguments ) && actualTypeArguments .length > 0 ) {
606- return actualTypeArguments [0 ];
607- }
608- return Object .class ;
587+ ResolvableType resolvableType = ResolvableType .forClass (targetClass ).as (consumerInterface );
588+ Type messageType = resolvableType .getGeneric ().getType ();
589+ return messageType ;
609590 }
610591
592+
611593 private void initRocketMQPushConsumer () throws MQClientException {
612594 if (rocketMQListener == null && rocketMQReplyListener == null ) {
613595 throw new IllegalArgumentException ("Property 'rocketMQListener' or 'rocketMQReplyListener' is required" );
0 commit comments