|
58 | 58 | import org.springframework.context.ApplicationContextAware; |
59 | 59 | import org.springframework.context.SmartLifecycle; |
60 | 60 | import org.springframework.core.MethodParameter; |
| 61 | +import org.springframework.core.ResolvableType; |
61 | 62 | import org.springframework.messaging.Message; |
62 | 63 | import org.springframework.messaging.MessageHeaders; |
63 | 64 | import org.springframework.messaging.converter.MessageConversionException; |
|
66 | 67 | import org.springframework.messaging.support.MessageBuilder; |
67 | 68 | import org.springframework.util.Assert; |
68 | 69 | import org.springframework.util.MimeTypeUtils; |
| 70 | +import org.springframework.util.ReflectionUtils; |
69 | 71 |
|
70 | 72 | @SuppressWarnings("WeakerAccess") |
71 | 73 | public class DefaultRocketMQListenerContainer implements InitializingBean, |
@@ -553,61 +555,36 @@ private Object doConvertMessage(MessageExt messageExt) { |
553 | 555 |
|
554 | 556 | private MethodParameter getMethodParameter() { |
555 | 557 | Class<?> targetClass; |
| 558 | + Class<?> consumerInterface; |
556 | 559 | if (rocketMQListener != null) { |
557 | 560 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); |
| 561 | + consumerInterface = RocketMQListener.class; |
558 | 562 | } else { |
559 | 563 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener); |
| 564 | + consumerInterface = RocketMQReplyListener.class; |
560 | 565 | } |
561 | | - Type messageType = this.getMessageType(); |
562 | | - Class clazz = null; |
563 | | - if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) { |
564 | | - clazz = (Class) ((ParameterizedType) messageType).getRawType(); |
565 | | - } else if (messageType instanceof Class) { |
566 | | - clazz = (Class) messageType; |
567 | | - } else { |
568 | | - throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); |
569 | | - } |
570 | | - try { |
571 | | - final Method method = targetClass.getMethod("onMessage", clazz); |
572 | | - return new MethodParameter(method, 0); |
573 | | - } catch (NoSuchMethodException e) { |
574 | | - e.printStackTrace(); |
575 | | - throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); |
576 | | - } |
| 566 | + ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface); |
| 567 | + Class<?> methodParameterType = resolvableType.getGeneric().resolve(); |
| 568 | + Method onMessage = ReflectionUtils.findMethod(targetClass, "onMessage", methodParameterType); |
| 569 | + return MethodParameter.forExecutable(onMessage, 0); |
577 | 570 | } |
578 | 571 |
|
| 572 | + |
579 | 573 | private Type getMessageType() { |
580 | 574 | Class<?> targetClass; |
| 575 | + Class<?> consumerInterface; |
581 | 576 | if (rocketMQListener != null) { |
582 | 577 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); |
| 578 | + consumerInterface = RocketMQListener.class; |
583 | 579 | } else { |
584 | 580 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener); |
| 581 | + consumerInterface = RocketMQReplyListener.class; |
585 | 582 | } |
586 | | - Type matchedGenericInterface = null; |
587 | | - while (Objects.nonNull(targetClass)) { |
588 | | - Type[] interfaces = targetClass.getGenericInterfaces(); |
589 | | - if (Objects.nonNull(interfaces)) { |
590 | | - for (Type type : interfaces) { |
591 | | - if (type instanceof ParameterizedType && |
592 | | - (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) { |
593 | | - matchedGenericInterface = type; |
594 | | - break; |
595 | | - } |
596 | | - } |
597 | | - } |
598 | | - targetClass = targetClass.getSuperclass(); |
599 | | - } |
600 | | - if (Objects.isNull(matchedGenericInterface)) { |
601 | | - return Object.class; |
602 | | - } |
603 | | - |
604 | | - Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments(); |
605 | | - if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { |
606 | | - return actualTypeArguments[0]; |
607 | | - } |
608 | | - return Object.class; |
| 583 | + ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface); |
| 584 | + return resolvableType.getGeneric().getType(); |
609 | 585 | } |
610 | 586 |
|
| 587 | + |
611 | 588 | private void initRocketMQPushConsumer() throws MQClientException { |
612 | 589 | if (rocketMQListener == null && rocketMQReplyListener == null) { |
613 | 590 | throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required"); |
|
0 commit comments