From 9c29a4901899ec98e451a4d0c9544c846e050764 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 29 Jul 2024 22:49:49 -0500 Subject: [PATCH] Allow custom object mapper for headers This commit adds support for a user-provided Jackson ObjectMapper to be used when deserializing JSON header values. See #723 --- .../ROOT/pages/reference/pulsar-header.adoc | 18 ++ .../MethodReactivePulsarListenerEndpoint.java | 19 +- ...arListenerAnnotationBeanPostProcessor.java | 4 + .../listener/ReactivePulsarListenerTests.java | 139 +++++++++++++-- .../PulsarHeaderObjectMapperUtils.java | 62 +++++++ ...arListenerAnnotationBeanPostProcessor.java | 3 + ...lsarReaderAnnotationBeanPostProcessor.java | 3 + .../config/MethodPulsarListenerEndpoint.java | 29 +-- .../config/MethodPulsarReaderEndpoint.java | 24 ++- ...ctPulsarMessageToSpringMessageAdapter.java | 21 ++- .../PulsarHeaderObjectMapperUtilsTests.java | 58 ++++++ .../pulsar/listener/PulsarListenerTests.java | 165 ++++++++++++++---- .../reader/PulsarReaderHeaderTests.java | 146 ++++++++++++++++ .../header/JsonPulsarHeaderMapperTests.java | 1 + 14 files changed, 608 insertions(+), 84 deletions(-) create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtils.java create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtilsTests.java create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderHeaderTests.java diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc index 0af63873d..f48446e3a 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc @@ -43,6 +43,24 @@ The `JsonPulsarHeaderMapper` has a property called `addToStringClasses()` that l During inbound mapping, they are mapped as `String`. By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way. +===== Custom ObjectMapper +The JSON mapper uses a reasonable configured Jasckson `ObjectMapper` to handle serialization of header values. +However, to provide a custom object mapper one must simply provide an `ObjectMapper` bean with the name `pulsarHeaderObjectMapper`. +For example: +[source, java] +---- +@Configuration(proxyBeanMethods = false) +static class PulsarHeadersCustomObjectMapperTestConfig { + + @Bean(name = "pulsarHeaderObjectMapper") + ObjectMapper customObjectMapper() { + var objectMapper = new ObjectMapper(); + // do things with your special header object mapper here + return objectMapper; + } +} +---- + === Inbound/Outbound Patterns On the inbound side, by default, all Pulsar headers (message metadata plus user properties) are mapped to `MessageHeaders`. On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata. diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java index 585a6f402..53bdf13e6 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java @@ -31,7 +31,6 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; import org.springframework.core.log.LogAccessor; -import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.handler.annotation.Header; @@ -53,6 +52,7 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; +import com.fasterxml.jackson.databind.ObjectMapper; import reactor.core.publisher.Flux; /** @@ -71,6 +71,8 @@ public class MethodReactivePulsarListenerEndpoint extends AbstractReactivePul private Method method; + private ObjectMapper objectMapper; + private MessageHandlerMethodFactory messageHandlerMethodFactory; private SmartMessageConverter messagingConverter; @@ -209,7 +211,6 @@ protected HandlerAdapter configureListenerAdapter(AbstractPulsarMessageToSpringM @SuppressWarnings({ "unchecked", "rawtypes" }) protected AbstractPulsarMessageToSpringMessageAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { - AbstractPulsarMessageToSpringMessageAdapter listener; if (isFluxListener()) { listener = new PulsarReactiveStreamingMessagingMessageListenerAdapter<>(this.bean, this.method); @@ -217,20 +218,26 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageListenerIn else { listener = new PulsarReactiveOneByOneMessagingMessageListenerAdapter<>(this.bean, this.method); } - - if (messageConverter instanceof PulsarMessageConverter) { - listener.setMessageConverter((PulsarMessageConverter) messageConverter); + if (messageConverter instanceof PulsarMessageConverter pulsarMessageConverter) { + listener.setMessageConverter(pulsarMessageConverter); } if (this.messagingConverter != null) { listener.setMessagingConverter(this.messagingConverter); } - BeanResolver resolver = getBeanResolver(); + if (this.objectMapper != null) { + listener.setObjectMapper(this.objectMapper); + } + var resolver = getBeanResolver(); if (resolver != null) { listener.setBeanResolver(resolver); } return listener; } + public void setObjectMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + public void setMessagingConverter(SmartMessageConverter messagingConverter) { this.messagingConverter = messagingConverter; } diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java index 65ce39d26..8109bb0b5 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java @@ -46,6 +46,7 @@ import org.springframework.format.FormatterRegistry; import org.springframework.lang.Nullable; import org.springframework.pulsar.annotation.AbstractPulsarAnnotationsBeanPostProcessor; +import org.springframework.pulsar.annotation.PulsarHeaderObjectMapperUtils; import org.springframework.pulsar.annotation.PulsarListenerConfigurer; import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames; import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar; @@ -281,6 +282,9 @@ private void resolveDeadLetterPolicy(MethodReactivePulsarListenerEndpoint end @SuppressWarnings("unchecked") protected void postProcessEndpointsBeforeRegistration() { + PulsarHeaderObjectMapperUtils.customMapper(this.beanFactory) + .ifPresent((objectMapper) -> this.processedEndpoints + .forEach((endpoint) -> endpoint.setObjectMapper(objectMapper))); if (this.processedEndpoints.size() == 1) { MethodReactivePulsarListenerEndpoint endpoint = this.processedEndpoints.get(0); if (endpoint.getConsumerCustomizer() != null) { diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index 2b86d8b61..c268e9cb6 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -50,6 +50,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; @@ -68,18 +69,23 @@ import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.BasicListenersTestCases.BasicListenersTestCasesConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig; import org.springframework.pulsar.reactive.support.MessageUtils; import org.springframework.pulsar.support.PulsarHeaders; +import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; import org.springframework.pulsar.test.model.UserPojo; import org.springframework.pulsar.test.model.UserRecord; +import org.springframework.pulsar.test.model.json.UserRecordDeserializer; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ObjectUtils; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -541,6 +547,7 @@ Mono listenString(String ignored) { class PulsarHeadersTest { static CountDownLatch simpleListenerLatch = new CountDownLatch(1); + static CountDownLatch simpleListenerPojoLatch = new CountDownLatch(1); static CountDownLatch pulsarMessageListenerLatch = new CountDownLatch(1); static CountDownLatch springMessagingMessageListenerLatch = new CountDownLatch(1); @@ -548,20 +555,46 @@ class PulsarHeadersTest { static AtomicReference messageId = new AtomicReference<>(); static AtomicReference topicName = new AtomicReference<>(); static AtomicReference fooValue = new AtomicReference<>(); + static AtomicReference pojoValue = new AtomicReference<>(); static AtomicReference rawData = new AtomicReference<>(); @Test void simpleListenerWithHeaders() throws Exception { - MessageId messageId = pulsarTemplate.newMessage("hello-simple-listener") + var topic = "rplt-simpleListenerWithHeaders"; + var msg = "hello-%s".formatted(topic); + MessageId messageId = pulsarTemplate.newMessage(msg) .withMessageCustomizer(messageBuilder -> messageBuilder.property("foo", "simpleListenerWithHeaders")) - .withTopic("simpleListenerWithHeaders") + .withTopic(topic) .send(); assertThat(simpleListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(capturedData.get()).isEqualTo("hello-simple-listener"); - assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId); - assertThat(topicName.get()).isEqualTo("persistent://public/default/simpleListenerWithHeaders"); - assertThat(fooValue.get()).isEqualTo("simpleListenerWithHeaders"); - assertThat(rawData.get()).isEqualTo("hello-simple-listener".getBytes(StandardCharsets.UTF_8)); + assertThat(PulsarHeadersTest.messageId).hasValue(messageId); + assertThat(topicName).hasValue("persistent://public/default/%s".formatted(topic)); + assertThat(capturedData).hasValue(msg); + assertThat(rawData).hasValue(msg.getBytes(StandardCharsets.UTF_8)); + assertThat(fooValue).hasValue("simpleListenerWithHeaders"); + } + + @Test + void simpleListenerWithPojoHeader() throws Exception { + var topic = "rplt-simpleListenerWithPojoHeader"; + var msg = "hello-%s".formatted(topic); + // In order to send complex headers (pojo) must manually map and set each + // header as follows + var user = new UserRecord("that", 100); + var headers = new HashMap(); + headers.put("user", user); + var headerMapper = JsonPulsarHeaderMapper.builder().build(); + var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers)); + MessageId messageId = pulsarTemplate.newMessage(msg) + .withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property)) + .withTopic(topic) + .send(); + assertThat(simpleListenerPojoLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(PulsarHeadersTest.messageId).hasValue(messageId); + assertThat(topicName).hasValue("persistent://public/default/%s".formatted(topic)); + assertThat(pojoValue).hasValue(user); + assertThat(capturedData).hasValue(msg); + assertThat(rawData).hasValue(msg.getBytes(StandardCharsets.UTF_8)); } @Test @@ -569,12 +602,12 @@ void pulsarMessageListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-pulsar-message-listener") .withMessageCustomizer( messageBuilder -> messageBuilder.property("foo", "pulsarMessageListenerWithHeaders")) - .withTopic("pulsarMessageListenerWithHeaders") + .withTopic("rplt-pulsarMessageListenerWithHeaders") .send(); assertThat(pulsarMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedData.get()).isEqualTo("hello-pulsar-message-listener"); assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId); - assertThat(topicName.get()).isEqualTo("persistent://public/default/pulsarMessageListenerWithHeaders"); + assertThat(topicName.get()).isEqualTo("persistent://public/default/rplt-pulsarMessageListenerWithHeaders"); assertThat(fooValue.get()).isEqualTo("pulsarMessageListenerWithHeaders"); assertThat(rawData.get()).isEqualTo("hello-pulsar-message-listener".getBytes(StandardCharsets.UTF_8)); } @@ -584,13 +617,13 @@ void springMessagingMessageListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-spring-messaging-message-listener") .withMessageCustomizer( messageBuilder -> messageBuilder.property("foo", "springMessagingMessageListenerWithHeaders")) - .withTopic("springMessagingMessageListenerWithHeaders") + .withTopic("rplt-springMessagingMessageListenerWithHeaders") .send(); assertThat(springMessagingMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedData.get()).isEqualTo("hello-spring-messaging-message-listener"); assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId); assertThat(topicName.get()) - .isEqualTo("persistent://public/default/springMessagingMessageListenerWithHeaders"); + .isEqualTo("persistent://public/default/rplt-springMessagingMessageListenerWithHeaders"); assertThat(fooValue.get()).isEqualTo("springMessagingMessageListenerWithHeaders"); assertThat(rawData.get()) .isEqualTo("hello-spring-messaging-message-listener".getBytes(StandardCharsets.UTF_8)); @@ -600,8 +633,9 @@ void springMessagingMessageListenerWithHeaders() throws Exception { @Configuration static class PulsarListenerWithHeadersConfig { - @ReactivePulsarListener(subscriptionName = "simple-listener-with-headers-sub", - topics = "simpleListenerWithHeaders", consumerCustomizer = "subscriptionInitialPositionEarliest") + @ReactivePulsarListener(topics = "rplt-simpleListenerWithHeaders", + subscriptionName = "rplt-simple-listener-with-headers-sub", + consumerCustomizer = "subscriptionInitialPositionEarliest") Mono simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, @Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData, @Header("foo") String foo) { @@ -614,8 +648,23 @@ Mono simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ return Mono.empty(); } - @ReactivePulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub", - topics = "pulsarMessageListenerWithHeaders", + @ReactivePulsarListener(topics = "rplt-simpleListenerWithPojoHeader", + subscriptionName = "simpleListenerWithPojoHeader-sub", + consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono simpleListenerWithPojoHeader(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, + @Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData, + @Header("user") UserRecord user) { + capturedData.set(data); + PulsarHeadersTest.messageId.set(messageId); + PulsarHeadersTest.topicName.set(topicName); + pojoValue.set(user); + PulsarHeadersTest.rawData.set(rawData); + simpleListenerPojoLatch.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(subscriptionName = "rplt-pulsar-message-listener-with-headers-sub", + topics = "rplt-pulsarMessageListenerWithHeaders", consumerCustomizer = "subscriptionInitialPositionEarliest") Mono pulsarMessageListenerWithHeaders(Message data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, @@ -630,8 +679,8 @@ Mono pulsarMessageListenerWithHeaders(Message data, return Mono.empty(); } - @ReactivePulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub", - topics = "springMessagingMessageListenerWithHeaders", + @ReactivePulsarListener(subscriptionName = "rplt-pulsar-message-listener-with-headers-sub", + topics = "rplt-springMessagingMessageListenerWithHeaders", consumerCustomizer = "subscriptionInitialPositionEarliest") Mono springMessagingMessageListenerWithHeaders(org.springframework.messaging.Message data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, @@ -650,6 +699,62 @@ Mono springMessagingMessageListenerWithHeaders(org.springframework.messagi } + @Nested + @ContextConfiguration(classes = PulsarHeadersCustomObjectMapperTestConfig.class) + class PulsarHeadersCustomObjectMapperTest { + + private static final String TOPIC = "rplt-listenerWithPojoHeader-custom"; + + private static final CountDownLatch listenerLatch = new CountDownLatch(1); + + private static UserRecord userPassedIntoListener; + + @Test + void whenPulsarHeaderObjectMapperIsDefinedThenItIsUsedToDeserializeHeaders() throws Exception { + var msg = "hello-%s".formatted(TOPIC); + // In order to send complex headers (pojo) must manually map and set each + // header as follows + var user = new UserRecord("that", 100); + var headers = new HashMap(); + headers.put("user", user); + var headerMapper = JsonPulsarHeaderMapper.builder().build(); + var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers)); + pulsarTemplate.newMessage(msg) + .withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property)) + .withTopic(TOPIC) + .send(); + // Custom deser adds suffix to name and bumps age + 5 + var expectedUser = new UserRecord(user.name() + "-deser", user.age() + 5); + assertThat(listenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(userPassedIntoListener).isEqualTo(expectedUser); + } + + @Configuration(proxyBeanMethods = false) + static class PulsarHeadersCustomObjectMapperTestConfig { + + @Bean(name = "pulsarHeaderObjectMapper") + ObjectMapper customObjectMapper() { + var objectMapper = new ObjectMapper(); + var module = new SimpleModule(); + module.addDeserializer(UserRecord.class, new UserRecordDeserializer()); + objectMapper.registerModule(module); + return objectMapper; + } + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub", + consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenerWithPojoHeader(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, + @Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData, + @Header("user") UserRecord user) { + userPassedIntoListener = user; + listenerLatch.countDown(); + return Mono.empty(); + } + + } + + } + @Nested @ContextConfiguration(classes = PulsarListenerConcurrencyTestCases.TestPulsarListenersForConcurrency.class) class PulsarListenerConcurrencyTestCases { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtils.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtils.java new file mode 100644 index 000000000..2f33cbd37 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtils.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.annotation; + +import java.util.Optional; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.core.log.LogAccessor; +import org.springframework.util.Assert; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Resolves the {@link ObjectMapper} to use when serializing JSON header values. + * + * @author Chris Bono + * @since 1.2.0 + */ +public final class PulsarHeaderObjectMapperUtils { + + private static final String PULSAR_HEADER_OBJECT_MAPPER_BEAN_NAME = "pulsarHeaderObjectMapper"; + + private static final LogAccessor LOG = new LogAccessor(PulsarHeaderObjectMapperUtils.class); + + private PulsarHeaderObjectMapperUtils() { + } + + /** + * Gets the optional {@link ObjectMapper} to use when deserializing JSON header + * values. The mapper bean is expected to be registered with the name + * 'pulsarHeaderObjectMapper'. + * @param beanFactory the bean factory that may contain the mapper bean + * @return optional mapper or empty if bean not registered under the expected name + */ + public static Optional customMapper(BeanFactory beanFactory) { + Assert.notNull(beanFactory, "beanFactory must not be null"); + try { + return Optional.of(beanFactory.getBean(PULSAR_HEADER_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class)); + } + catch (NoSuchBeanDefinitionException ex) { + LOG.debug(() -> "No '%s' bean defined - will use standard object mapper for header values" + .formatted(PULSAR_HEADER_OBJECT_MAPPER_BEAN_NAME)); + } + return Optional.empty(); + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java index f19d76952..59c570207 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java @@ -270,6 +270,9 @@ private void resolvePulsarConsumerErrorHandler(MethodPulsarListenerEndpoint e @SuppressWarnings("unchecked") protected void postProcessEndpointsBeforeRegistration() { + PulsarHeaderObjectMapperUtils.customMapper(this.beanFactory) + .ifPresent((objectMapper) -> this.processedEndpoints + .forEach((endpoint) -> endpoint.setObjectMapper(objectMapper))); if (this.processedEndpoints.size() == 1) { MethodPulsarListenerEndpoint endpoint = this.processedEndpoints.get(0); if (endpoint.getConsumerBuilderCustomizer() != null) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java index 78f3f824e..cd1ee65e8 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java @@ -237,6 +237,9 @@ else if (startMessageIdString.equalsIgnoreCase("latest")) { @SuppressWarnings("unchecked") protected void postProcessEndpointsBeforeRegistration() { + PulsarHeaderObjectMapperUtils.customMapper(this.beanFactory) + .ifPresent((objectMapper) -> this.processedEndpoints + .forEach((endpoint) -> endpoint.setObjectMapper(objectMapper))); if (this.processedEndpoints.size() == 1) { MethodPulsarReaderEndpoint endpoint = this.processedEndpoints.get(0); if (endpoint.getReaderBuilderCustomizer() != null) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java index 860397714..8652db6cb 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java @@ -32,7 +32,6 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; import org.springframework.core.log.LogAccessor; -import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.handler.annotation.Header; @@ -57,6 +56,8 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * A {@link PulsarListenerEndpoint} providing the method to invoke to process an incoming * message for this endpoint. @@ -74,6 +75,8 @@ public class MethodPulsarListenerEndpoint extends AbstractPulsarListenerEndpo private Method method; + private ObjectMapper objectMapper; + private MessageHandlerMethodFactory messageHandlerMethodFactory; private SmartMessageConverter messagingConverter; @@ -111,6 +114,10 @@ public Method getMethod() { return this.method; } + public void setObjectMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) { this.messageHandlerMethodFactory = messageHandlerMethodFactory; } @@ -235,25 +242,23 @@ protected HandlerAdapter configureListenerAdapter(AbstractPulsarMessageToSpringM @SuppressWarnings({ "unchecked", "rawtypes" }) protected AbstractPulsarMessageToSpringMessageAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { - AbstractPulsarMessageToSpringMessageAdapter listener; if (isBatchListener()) { - PulsarBatchMessagesToSpringMessageListenerAdapter messageListener = new PulsarBatchMessagesToSpringMessageListenerAdapter<>( - this.bean, this.method); - listener = messageListener; + listener = new PulsarBatchMessagesToSpringMessageListenerAdapter<>(this.bean, this.method); } else { - PulsarRecordMessageToSpringMessageListenerAdapter messageListener = new PulsarRecordMessageToSpringMessageListenerAdapter<>( - this.bean, this.method); - if (messageConverter instanceof PulsarMessageConverter) { - messageListener.setMessageConverter((PulsarMessageConverter) messageConverter); - } - listener = messageListener; + listener = new PulsarRecordMessageToSpringMessageListenerAdapter<>(this.bean, this.method); + } + if (messageConverter instanceof PulsarMessageConverter pulsarMessageConverter) { + listener.setMessageConverter(pulsarMessageConverter); } if (this.messagingConverter != null) { listener.setMessagingConverter(this.messagingConverter); } - BeanResolver resolver = getBeanResolver(); + if (this.objectMapper != null) { + listener.setObjectMapper(this.objectMapper); + } + var resolver = getBeanResolver(); if (resolver != null) { listener.setBeanResolver(resolver); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java index b38db4178..d39b7745c 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java @@ -29,7 +29,6 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; import org.springframework.core.log.LogAccessor; -import org.springframework.expression.BeanResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.handler.annotation.Header; @@ -48,6 +47,8 @@ import org.springframework.pulsar.support.converter.PulsarMessageConverter; import org.springframework.util.Assert; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * A {@link PulsarReaderEndpoint} providing the method to invoke to process an incoming * message for this endpoint. @@ -63,6 +64,8 @@ public class MethodPulsarReaderEndpoint extends AbstractPulsarReaderEndpoint< private Method method; + private ObjectMapper objectMapper; + private SmartMessageConverter messagingConverter; private MessageHandlerMethodFactory messageHandlerMethodFactory; @@ -89,6 +92,10 @@ public Method getMethod() { return this.method; } + public void setObjectMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + @Override protected AbstractPulsarMessageToSpringMessageAdapter createReaderListener( PulsarMessageReaderContainer container, @Nullable MessageConverter messageConverter) { @@ -172,19 +179,18 @@ protected HandlerAdapter configureListenerAdapter(AbstractPulsarMessageToSpringM @SuppressWarnings({ "unchecked", "rawtypes" }) protected AbstractPulsarMessageToSpringMessageAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { - - AbstractPulsarMessageToSpringMessageAdapter listener; - PulsarRecordMessageToSpringMessageReaderAdapter messageListener = new PulsarRecordMessageToSpringMessageReaderAdapter<>( + AbstractPulsarMessageToSpringMessageAdapter listener = new PulsarRecordMessageToSpringMessageReaderAdapter<>( this.bean, this.method); - if (messageConverter instanceof PulsarMessageConverter) { - messageListener.setMessageConverter((PulsarMessageConverter) messageConverter); + if (messageConverter instanceof PulsarMessageConverter pulsarMessageConverter) { + listener.setMessageConverter(pulsarMessageConverter); } - listener = messageListener; - if (this.messagingConverter != null) { listener.setMessagingConverter(this.messagingConverter); } - BeanResolver resolver = getBeanResolver(); + if (this.objectMapper != null) { + listener.setObjectMapper(this.objectMapper); + } + var resolver = getBeanResolver(); if (resolver != null) { listener.setBeanResolver(resolver); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java index a88878116..3126b171f 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java @@ -43,6 +43,8 @@ import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; import org.springframework.util.Assert; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * An abstract {@link org.apache.pulsar.client.api.MessageListener} adapter providing the * necessary infrastructure to extract the payload from a Pulsar message. @@ -79,21 +81,21 @@ public abstract class AbstractPulsarMessageToSpringMessageAdapter { private boolean isConsumerRecords; - private boolean converterSet; + private boolean customConverterSet; - private PulsarMessageConverter messageConverter = new PulsarRecordMessageConverter( - JsonPulsarHeaderMapper.builder().build()); + private PulsarMessageConverter messageConverter; private Type fallbackType = Object.class; public AbstractPulsarMessageToSpringMessageAdapter(Object bean, Method method) { this.bean = bean; + this.messageConverter = new PulsarRecordMessageConverter(JsonPulsarHeaderMapper.builder().build()); this.inferredType = determineInferredType(method); } public void setMessageConverter(PulsarMessageConverter messageConverter) { this.messageConverter = messageConverter; - this.converterSet = true; + this.customConverterSet = true; } protected final PulsarMessageConverter getMessageConverter() { @@ -101,11 +103,18 @@ protected final PulsarMessageConverter getMessageConverter() { } public void setMessagingConverter(SmartMessageConverter messageConverter) { - Assert.isTrue(!this.converterSet, "Cannot set the SmartMessageConverter when setting the messageConverter, " - + "add the SmartConverter to the message converter instead"); + Assert.isTrue(!this.customConverterSet, "Cannot set the SmartMessageConverter on a custom messageConverter - " + + "add the SmartConverter to the custom converter instead"); ((PulsarRecordMessageConverter) this.messageConverter).setMessagingConverter(messageConverter); } + public void setObjectMapper(ObjectMapper objectMapper) { + Assert.isTrue(!this.customConverterSet, "Cannot set the ObjectMapper on a custom messageConverter - " + + "set the ObjectMapper on the custom converter instead"); + this.messageConverter = new PulsarRecordMessageConverter( + JsonPulsarHeaderMapper.builder().objectMapper(objectMapper).build()); + } + protected Type getType() { return this.inferredType == null ? this.fallbackType : this.inferredType; } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtilsTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtilsTests.java new file mode 100644 index 000000000..37d3ab8b8 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/annotation/PulsarHeaderObjectMapperUtilsTests.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.annotation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Tests for {@link PulsarHeaderObjectMapperUtils}. + */ +class PulsarHeaderObjectMapperUtilsTests { + + @Test + void whenCustomMapperDefinedItIsReturned() { + var mapper = new ObjectMapper(); + var beanFactory = mock(BeanFactory.class); + when(beanFactory.getBean("pulsarHeaderObjectMapper", ObjectMapper.class)).thenReturn(mapper); + assertThat(PulsarHeaderObjectMapperUtils.customMapper(beanFactory)).hasValue(mapper); + } + + @Test + void whenCustomMapperIsNotDefinedEmptyIsReturned() { + var beanFactory = mock(BeanFactory.class); + when(beanFactory.getBean("pulsarHeaderObjectMapper", ObjectMapper.class)) + .thenThrow(new NoSuchBeanDefinitionException("pulsarHeaderObjectMapper")); + assertThat(PulsarHeaderObjectMapperUtils.customMapper(beanFactory)).isEmpty(); + } + + @Test + void whenBeanFactoryNullExceptionIsThrown() { + assertThatIllegalArgumentException().isThrownBy(() -> PulsarHeaderObjectMapperUtils.customMapper(null)) + .withMessage("beanFactory must not be null"); + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index 73a136426..e8ae9e411 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -18,10 +18,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.springframework.pulsar.listener.PulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -54,6 +56,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.annotation.PulsarListener; @@ -69,14 +72,20 @@ import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.pulsar.listener.PulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig; import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig; import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig; import org.springframework.pulsar.support.PulsarHeaders; +import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; import org.springframework.pulsar.test.model.UserPojo; import org.springframework.pulsar.test.model.UserRecord; +import org.springframework.pulsar.test.model.json.UserRecordDeserializer; import org.springframework.test.context.ContextConfiguration; import org.springframework.util.backoff.FixedBackOff; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; + /** * @author Soby Chacko * @author Alexander Preuß @@ -678,10 +687,11 @@ void listenString(String ignored) { } @Nested - @ContextConfiguration(classes = PulsarListenerTests.PulsarHeadersTest.PulsarListerWithHeadersConfig.class) + @ContextConfiguration(classes = PulsarListenerWithHeadersConfig.class) class PulsarHeadersTest { static CountDownLatch simpleListenerLatch = new CountDownLatch(1); + static CountDownLatch simpleListenerPojoLatch = new CountDownLatch(1); static CountDownLatch pulsarMessageListenerLatch = new CountDownLatch(1); static CountDownLatch springMessagingMessageListenerLatch = new CountDownLatch(1); @@ -689,6 +699,7 @@ class PulsarHeadersTest { static volatile MessageId messageId; static volatile String topicName; static volatile String fooValue; + static volatile Object pojoValue; static volatile byte[] rawData; static CountDownLatch simpleBatchListenerLatch = new CountDownLatch(1); @@ -705,27 +716,50 @@ class PulsarHeadersTest { void simpleListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-simple-listener") .withMessageCustomizer(messageBuilder -> messageBuilder.property("foo", "simpleListenerWithHeaders")) - .withTopic("simpleListenerWithHeaders") + .withTopic("plt-simpleListenerWithHeaders") .send(); assertThat(simpleListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedData).isEqualTo("hello-simple-listener"); assertThat(PulsarHeadersTest.messageId).isEqualTo(messageId); - assertThat(topicName).isEqualTo("persistent://public/default/simpleListenerWithHeaders"); + assertThat(topicName).isEqualTo("persistent://public/default/plt-simpleListenerWithHeaders"); assertThat(fooValue).isEqualTo("simpleListenerWithHeaders"); assertThat(rawData).isEqualTo("hello-simple-listener".getBytes(StandardCharsets.UTF_8)); } + @Test + void simpleListenerWithPojoHeader() throws Exception { + var topic = "plt-simpleListenerWithPojoHeader"; + var msg = "hello-%s".formatted(topic); + // In order to send complex headers (pojo) must manually map and set each + // header as follows + var user = new UserRecord("that", 100); + var headers = new HashMap(); + headers.put("user", user); + var headerMapper = JsonPulsarHeaderMapper.builder().build(); + var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers)); + MessageId messageId = pulsarTemplate.newMessage(msg) + .withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property)) + .withTopic(topic) + .send(); + assertThat(simpleListenerPojoLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(PulsarHeadersTest.messageId).isEqualTo(messageId); + assertThat(topicName).isEqualTo("persistent://public/default/%s".formatted(topic)); + assertThat(pojoValue).isEqualTo(user); + assertThat(capturedData).isEqualTo(msg); + assertThat(rawData).isEqualTo(msg.getBytes(StandardCharsets.UTF_8)); + } + @Test void pulsarMessageListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-pulsar-message-listener") .withMessageCustomizer( messageBuilder -> messageBuilder.property("foo", "pulsarMessageListenerWithHeaders")) - .withTopic("pulsarMessageListenerWithHeaders") + .withTopic("plt-pulsarMessageListenerWithHeaders") .send(); assertThat(pulsarMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedData).isEqualTo("hello-pulsar-message-listener"); assertThat(PulsarHeadersTest.messageId).isEqualTo(messageId); - assertThat(topicName).isEqualTo("persistent://public/default/pulsarMessageListenerWithHeaders"); + assertThat(topicName).isEqualTo("persistent://public/default/plt-pulsarMessageListenerWithHeaders"); assertThat(fooValue).isEqualTo("pulsarMessageListenerWithHeaders"); assertThat(rawData).isEqualTo("hello-pulsar-message-listener".getBytes(StandardCharsets.UTF_8)); } @@ -735,12 +769,13 @@ void springMessagingMessageListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-spring-messaging-message-listener") .withMessageCustomizer( messageBuilder -> messageBuilder.property("foo", "springMessagingMessageListenerWithHeaders")) - .withTopic("springMessagingMessageListenerWithHeaders") + .withTopic("plt-springMessagingMessageListenerWithHeaders") .send(); assertThat(springMessagingMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedData).isEqualTo("hello-spring-messaging-message-listener"); assertThat(PulsarHeadersTest.messageId).isEqualTo(messageId); - assertThat(topicName).isEqualTo("persistent://public/default/springMessagingMessageListenerWithHeaders"); + assertThat(topicName) + .isEqualTo("persistent://public/default/plt-springMessagingMessageListenerWithHeaders"); assertThat(fooValue).isEqualTo("springMessagingMessageListenerWithHeaders"); assertThat(rawData).isEqualTo("hello-spring-messaging-message-listener".getBytes(StandardCharsets.UTF_8)); } @@ -750,12 +785,13 @@ void simpleBatchListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-simple-batch-listener") .withMessageCustomizer( messageBuilder -> messageBuilder.property("foo", "simpleBatchListenerWithHeaders")) - .withTopic("simpleBatchListenerWithHeaders") + .withTopic("plt-simpleBatchListenerWithHeaders") .send(); assertThat(simpleBatchListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedBatchData).containsExactly("hello-simple-batch-listener"); assertThat(batchMessageIds).containsExactly(messageId); - assertThat(batchTopicNames).containsExactly("persistent://public/default/simpleBatchListenerWithHeaders"); + assertThat(batchTopicNames) + .containsExactly("persistent://public/default/plt-simpleBatchListenerWithHeaders"); assertThat(batchFooValues).containsExactly("simpleBatchListenerWithHeaders"); } @@ -764,12 +800,12 @@ void pulsarMessageBatchListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-pulsar-message-batch-listener") .withMessageCustomizer( messageBuilder -> messageBuilder.property("foo", "pulsarMessageBatchListenerWithHeaders")) - .withTopic("pulsarMessageBatchListenerWithHeaders") + .withTopic("plt-pulsarMessageBatchListenerWithHeaders") .send(); assertThat(pulsarMessageBatchListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedBatchData).containsExactly("hello-pulsar-message-batch-listener"); assertThat(batchTopicNames) - .containsExactly("persistent://public/default/pulsarMessageBatchListenerWithHeaders"); + .containsExactly("persistent://public/default/plt-pulsarMessageBatchListenerWithHeaders"); assertThat(batchFooValues).containsExactly("pulsarMessageBatchListenerWithHeaders"); assertThat(batchMessageIds).containsExactly(messageId); } @@ -779,12 +815,12 @@ void springMessagingMessageBatchListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-spring-messaging-message-batch-listener") .withMessageCustomizer(messageBuilder -> messageBuilder.property("foo", "springMessagingMessageBatchListenerWithHeaders")) - .withTopic("springMessagingMessageBatchListenerWithHeaders") + .withTopic("plt-springMessagingMessageBatchListenerWithHeaders") .send(); assertThat(springMessagingMessageBatchListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedBatchData).containsExactly("hello-spring-messaging-message-batch-listener"); assertThat(batchTopicNames) - .containsExactly("persistent://public/default/springMessagingMessageBatchListenerWithHeaders"); + .containsExactly("persistent://public/default/plt-springMessagingMessageBatchListenerWithHeaders"); assertThat(batchFooValues).containsExactly("springMessagingMessageBatchListenerWithHeaders"); assertThat(batchMessageIds).containsExactly(messageId); } @@ -794,21 +830,22 @@ void pulsarMessagesBatchListenerWithHeaders() throws Exception { MessageId messageId = pulsarTemplate.newMessage("hello-pulsar-messages-batch-listener") .withMessageCustomizer( messageBuilder -> messageBuilder.property("foo", "pulsarMessagesBatchListenerWithHeaders")) - .withTopic("pulsarMessagesBatchListenerWithHeaders") + .withTopic("plt-pulsarMessagesBatchListenerWithHeaders") .send(); assertThat(pulsarMessagesBatchListenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(capturedBatchData).containsExactly("hello-pulsar-messages-batch-listener"); assertThat(batchTopicNames) - .containsExactly("persistent://public/default/pulsarMessagesBatchListenerWithHeaders"); + .containsExactly("persistent://public/default/plt-pulsarMessagesBatchListenerWithHeaders"); assertThat(batchFooValues).containsExactly("pulsarMessagesBatchListenerWithHeaders"); assertThat(batchMessageIds).containsExactly(messageId); } @EnablePulsar @Configuration - static class PulsarListerWithHeadersConfig { + static class PulsarListenerWithHeadersConfig { - @PulsarListener(subscriptionName = "simple-listener-with-headers-sub", topics = "simpleListenerWithHeaders") + @PulsarListener(subscriptionName = "plt-simple-listener-with-headers-sub", + topics = "plt-simpleListenerWithHeaders") void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, @Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData, @Header("foo") String foo) { @@ -820,8 +857,21 @@ void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) Me simpleListenerLatch.countDown(); } - @PulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub", - topics = "pulsarMessageListenerWithHeaders") + @PulsarListener(topics = "plt-simpleListenerWithPojoHeader", + subscriptionName = "plt-simpleListenerWithPojoHeader-sub") + void simpleListenerWithPojoHeader(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, + @Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData, + @Header("user") UserRecord user) { + capturedData = data; + PulsarHeadersTest.messageId = messageId; + PulsarHeadersTest.topicName = topicName; + pojoValue = user; + PulsarHeadersTest.rawData = rawData; + simpleListenerPojoLatch.countDown(); + } + + @PulsarListener(subscriptionName = "plt-pulsar-message-listener-with-headers-sub", + topics = "plt-pulsarMessageListenerWithHeaders") void pulsarMessageListenerWithHeaders(Message data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, @Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData, @@ -834,8 +884,8 @@ void pulsarMessageListenerWithHeaders(Message data, pulsarMessageListenerLatch.countDown(); } - @PulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub", - topics = "springMessagingMessageListenerWithHeaders") + @PulsarListener(subscriptionName = "plt-pulsar-message-listener-with-headers-sub", + topics = "plt-springMessagingMessageListenerWithHeaders") void springMessagingMessageListenerWithHeaders(org.springframework.messaging.Message data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId, @Header(PulsarHeaders.RAW_DATA) byte[] rawData, @Header(PulsarHeaders.TOPIC_NAME) String topicName, @@ -848,8 +898,8 @@ void springMessagingMessageListenerWithHeaders(org.springframework.messaging.Mes springMessagingMessageListenerLatch.countDown(); } - @PulsarListener(subscriptionName = "simple-batch-listener-with-headers-sub", - topics = "simpleBatchListenerWithHeaders", batch = true) + @PulsarListener(subscriptionName = "plt-simple-batch-listener-with-headers-sub", + topics = "plt-simpleBatchListenerWithHeaders", batch = true) void simpleBatchListenerWithHeaders(List data, @Header(PulsarHeaders.MESSAGE_ID) List messageIds, @Header(PulsarHeaders.TOPIC_NAME) List topicNames, @Header("foo") List fooValues) { @@ -860,46 +910,41 @@ void simpleBatchListenerWithHeaders(List data, simpleBatchListenerLatch.countDown(); } - @PulsarListener(subscriptionName = "pulsarMessage-batch-listener-with-headers-sub", - topics = "pulsarMessageBatchListenerWithHeaders", batch = true) + @PulsarListener(subscriptionName = "plt-pulsarMessage-batch-listener-with-headers-sub", + topics = "plt-pulsarMessageBatchListenerWithHeaders", batch = true) void pulsarMessageBatchListenerWithHeaders(List> data, @Header(PulsarHeaders.MESSAGE_ID) List messageIds, @Header(PulsarHeaders.TOPIC_NAME) List topicNames, @Header("foo") List fooValues) { - capturedBatchData = data.stream().map(Message::getValue).collect(Collectors.toList()); - batchMessageIds = messageIds; batchTopicNames = topicNames; batchFooValues = fooValues; pulsarMessageBatchListenerLatch.countDown(); } - @PulsarListener(subscriptionName = "spring-messaging-message-batch-listener-with-headers-sub", - topics = "springMessagingMessageBatchListenerWithHeaders", batch = true) + @PulsarListener(subscriptionName = "plt-spring-messaging-message-batch-listener-with-headers-sub", + topics = "plt-springMessagingMessageBatchListenerWithHeaders", batch = true) void springMessagingMessageBatchListenerWithHeaders( List> data, @Header(PulsarHeaders.MESSAGE_ID) List messageIds, @Header(PulsarHeaders.TOPIC_NAME) List topicNames, @Header("foo") List fooValues) { - capturedBatchData = data.stream() .map(org.springframework.messaging.Message::getPayload) .collect(Collectors.toList()); - batchMessageIds = messageIds; batchTopicNames = topicNames; batchFooValues = fooValues; springMessagingMessageBatchListenerLatch.countDown(); } - @PulsarListener(subscriptionName = "pulsarMessages-batch-listener-with-headers-sub", - topics = "pulsarMessagesBatchListenerWithHeaders", batch = true) + @PulsarListener(subscriptionName = "plt-pulsarMessages-batch-listener-with-headers-sub", + topics = "plt-pulsarMessagesBatchListenerWithHeaders", batch = true) void pulsarMessagesBatchListenerWithHeaders(Messages data, @Header(PulsarHeaders.MESSAGE_ID) List messageIds, @Header(PulsarHeaders.TOPIC_NAME) List topicNames, @Header("foo") List fooValues) { List list = new ArrayList<>(); data.iterator().forEachRemaining(m -> list.add(m.getValue())); capturedBatchData = list; - batchMessageIds = messageIds; batchTopicNames = topicNames; batchFooValues = fooValues; @@ -910,6 +955,58 @@ void pulsarMessagesBatchListenerWithHeaders(Messages data, } + @Nested + @ContextConfiguration(classes = PulsarHeadersCustomObjectMapperTestConfig.class) + class PulsarHeadersCustomObjectMapperTest { + + private static final String TOPIC = "plt-listenerWithPojoHeaderCustom"; + + private static final CountDownLatch listenerLatch = new CountDownLatch(1); + + private static UserRecord userPassedIntoListener; + + @Test + void whenPulsarHeaderObjectMapperIsDefinedThenItIsUsedToDeserializeHeaders() throws Exception { + var msg = "hello-%s".formatted(TOPIC); + // In order to send complex headers (pojo) must manually map and set each + // header as follows + var user = new UserRecord("that", 100); + var headers = new HashMap(); + headers.put("user", user); + var headerMapper = JsonPulsarHeaderMapper.builder().build(); + var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers)); + MessageId messageId = pulsarTemplate.newMessage(msg) + .withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property)) + .withTopic(TOPIC) + .send(); + // Custom deser adds suffix to name and bumps age + 5 + var expectedUser = new UserRecord(user.name() + "-deser", user.age() + 5); + assertThat(listenerLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(userPassedIntoListener).isEqualTo(expectedUser); + } + + @Configuration(proxyBeanMethods = false) + static class PulsarHeadersCustomObjectMapperTestConfig { + + @Bean(name = "pulsarHeaderObjectMapper") + ObjectMapper customObjectMapper() { + var objectMapper = new ObjectMapper(); + var module = new SimpleModule(); + module.addDeserializer(UserRecord.class, new UserRecordDeserializer()); + objectMapper.registerModule(module); + return objectMapper; + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub") + void listenerWithPojoHeader(String ignored, @Header("user") UserRecord user) { + userPassedIntoListener = user; + listenerLatch.countDown(); + } + + } + + } + @Nested @ContextConfiguration(classes = ConsumerPauseTest.ConsumerPauseConfig.class) class ConsumerPauseTest { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderHeaderTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderHeaderTests.java new file mode 100644 index 000000000..fec7f17af --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderHeaderTests.java @@ -0,0 +1,146 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.MessageId; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.pulsar.annotation.PulsarReader; +import org.springframework.pulsar.reader.PulsarReaderHeaderTests.WithCustomObjectMapperTest.WithCustomObjectMapperTestConfig; +import org.springframework.pulsar.reader.PulsarReaderHeaderTests.WithStandardObjectMapperTest.WithStandardObjectMapperTestConfig; +import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; +import org.springframework.pulsar.test.model.UserRecord; +import org.springframework.pulsar.test.model.json.UserRecordDeserializer; +import org.springframework.test.context.ContextConfiguration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; + +/** + * Tests consuming records with header in {@link PulsarReader @PulsarReader}. + * + * @author Chris Bono + */ +class PulsarReaderHeaderTests extends PulsarReaderTestsBase { + + @Nested + @ContextConfiguration(classes = WithStandardObjectMapperTestConfig.class) + class WithStandardObjectMapperTest { + + private static final String TOPIC = "prht-with-standard-mapper-topic"; + + private static CountDownLatch listenerLatch = new CountDownLatch(1); + + private static UserRecord userPassedIntoListener; + + @Test + void whenObjectMapperIsNotDefinedThenStandardMapperUsedToDeserHeaders() throws Exception { + var msg = "hello-%s".formatted(TOPIC); + // In order to send complex headers (pojo) must manually map and set each + // header as follows + var user = new UserRecord("that", 100); + var headers = new HashMap(); + headers.put("user", user); + var headerMapper = JsonPulsarHeaderMapper.builder().build(); + var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers)); + pulsarTemplate.newMessage(msg) + .withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property)) + .withTopic(TOPIC) + .send(); + assertThat(listenerLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(userPassedIntoListener).isEqualTo(user); + } + + @Configuration(proxyBeanMethods = false) + static class WithStandardObjectMapperTestConfig { + + @PulsarReader(topics = TOPIC, startMessageId = "earliest") + public void listenWithHeaders(org.apache.pulsar.client.api.Message msg, + @Header("user") UserRecord user) { + userPassedIntoListener = user; + listenerLatch.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = WithCustomObjectMapperTestConfig.class) + class WithCustomObjectMapperTest { + + private static final String TOPIC = "prht-with-custom-mapper-topic"; + + private static CountDownLatch listenerLatch = new CountDownLatch(1); + + private static UserRecord userPassedIntoListener; + + @Test + void whenObjectMapperIsDefinedThenItIsUsedToDeserHeaders() throws Exception { + var msg = "hello-%s".formatted(TOPIC); + // In order to send complex headers (pojo) must manually map and set each + // header as follows + var user = new UserRecord("that", 100); + var headers = new HashMap(); + headers.put("user", user); + var headerMapper = JsonPulsarHeaderMapper.builder().build(); + var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers)); + MessageId messageId = pulsarTemplate.newMessage(msg) + .withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property)) + .withTopic(TOPIC) + .send(); + // Custom deser adds suffix to name and bumps age + 5 + var expectedUser = new UserRecord(user.name() + "-deser", user.age() + 5); + assertThat(listenerLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(userPassedIntoListener).isEqualTo(expectedUser); + } + + @Configuration(proxyBeanMethods = false) + static class WithCustomObjectMapperTestConfig { + + @Bean(name = "pulsarHeaderObjectMapper") + ObjectMapper customObjectMapper() { + var objectMapper = new ObjectMapper(); + var module = new SimpleModule(); + module.addDeserializer(UserRecord.class, new UserRecordDeserializer()); + objectMapper.registerModule(module); + return objectMapper; + } + + @PulsarReader(topics = TOPIC, startMessageId = "earliest") + public void listenWithHeaders(org.apache.pulsar.client.api.Message msg, + @Header("user") UserRecord user) { + userPassedIntoListener = user; + listenerLatch.countDown(); + } + + } + + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/JsonPulsarHeaderMapperTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/JsonPulsarHeaderMapperTests.java index 9a07549d1..e31d6d885 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/JsonPulsarHeaderMapperTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/JsonPulsarHeaderMapperTests.java @@ -118,6 +118,7 @@ void springHeadersWithObjectValues() { var headers = new HashMap(); headers.put("foo", "bar"); headers.put("uuid", uuid); + var mapped = mapper().toPulsarHeaders(new MessageHeaders(headers)); assertThat(mapper().toPulsarHeaders(new MessageHeaders(headers))).containsEntry("foo", "bar") .containsEntry("uuid", "\"%s\"".formatted(uuid.toString())) .extractingByKey(JSON_TYPES, InstanceOfAssertFactories.STRING)