Skip to content

Commit

Permalink
Allow custom object mapper for headers
Browse files Browse the repository at this point in the history
This commit adds support for a user-provided Jackson ObjectMapper to be
used when deserializing JSON header values.

See spring-projects#723
  • Loading branch information
onobc committed Jul 30, 2024
1 parent 70e9e60 commit 9c29a49
Show file tree
Hide file tree
Showing 14 changed files with 608 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ The `JsonPulsarHeaderMapper` has a property called `addToStringClasses()` that l
During inbound mapping, they are mapped as `String`.
By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way.

===== Custom ObjectMapper
The JSON mapper uses a reasonable configured Jasckson `ObjectMapper` to handle serialization of header values.
However, to provide a custom object mapper one must simply provide an `ObjectMapper` bean with the name `pulsarHeaderObjectMapper`.
For example:
[source, java]
----
@Configuration(proxyBeanMethods = false)
static class PulsarHeadersCustomObjectMapperTestConfig {
@Bean(name = "pulsarHeaderObjectMapper")
ObjectMapper customObjectMapper() {
var objectMapper = new ObjectMapper();
// do things with your special header object mapper here
return objectMapper;
}
}
----

=== Inbound/Outbound Patterns
On the inbound side, by default, all Pulsar headers (message metadata plus user properties) are mapped to `MessageHeaders`.
On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.annotation.Header;
Expand All @@ -53,6 +52,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Flux;

/**
Expand All @@ -71,6 +71,8 @@ public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePul

private Method method;

private ObjectMapper objectMapper;

private MessageHandlerMethodFactory messageHandlerMethodFactory;

private SmartMessageConverter messagingConverter;
Expand Down Expand Up @@ -209,28 +211,33 @@ protected HandlerAdapter configureListenerAdapter(AbstractPulsarMessageToSpringM
@SuppressWarnings({ "unchecked", "rawtypes" })
protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageListenerInstance(
@Nullable MessageConverter messageConverter) {

AbstractPulsarMessageToSpringMessageAdapter<V> listener;
if (isFluxListener()) {
listener = new PulsarReactiveStreamingMessagingMessageListenerAdapter<>(this.bean, this.method);
}
else {
listener = new PulsarReactiveOneByOneMessagingMessageListenerAdapter<>(this.bean, this.method);
}

if (messageConverter instanceof PulsarMessageConverter) {
listener.setMessageConverter((PulsarMessageConverter) messageConverter);
if (messageConverter instanceof PulsarMessageConverter pulsarMessageConverter) {
listener.setMessageConverter(pulsarMessageConverter);
}
if (this.messagingConverter != null) {
listener.setMessagingConverter(this.messagingConverter);
}
BeanResolver resolver = getBeanResolver();
if (this.objectMapper != null) {
listener.setObjectMapper(this.objectMapper);
}
var resolver = getBeanResolver();
if (resolver != null) {
listener.setBeanResolver(resolver);
}
return listener;
}

public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

public void setMessagingConverter(SmartMessageConverter messagingConverter) {
this.messagingConverter = messagingConverter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.springframework.format.FormatterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.annotation.AbstractPulsarAnnotationsBeanPostProcessor;
import org.springframework.pulsar.annotation.PulsarHeaderObjectMapperUtils;
import org.springframework.pulsar.annotation.PulsarListenerConfigurer;
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar;
Expand Down Expand Up @@ -281,6 +282,9 @@ private void resolveDeadLetterPolicy(MethodReactivePulsarListenerEndpoint<?> end

@SuppressWarnings("unchecked")
protected void postProcessEndpointsBeforeRegistration() {
PulsarHeaderObjectMapperUtils.customMapper(this.beanFactory)
.ifPresent((objectMapper) -> this.processedEndpoints
.forEach((endpoint) -> endpoint.setObjectMapper(objectMapper)));
if (this.processedEndpoints.size() == 1) {
MethodReactivePulsarListenerEndpoint<?> endpoint = this.processedEndpoints.get(0);
if (endpoint.getConsumerCustomizer() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
Expand All @@ -68,18 +69,23 @@
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.BasicListenersTestCases.BasicListenersTestCasesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
import org.springframework.pulsar.reactive.support.MessageUtils;
import org.springframework.pulsar.support.PulsarHeaders;
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
import org.springframework.pulsar.test.model.UserPojo;
import org.springframework.pulsar.test.model.UserRecord;
import org.springframework.pulsar.test.model.json.UserRecordDeserializer;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -541,40 +547,67 @@ Mono<Void> listenString(String ignored) {
class PulsarHeadersTest {

static CountDownLatch simpleListenerLatch = new CountDownLatch(1);
static CountDownLatch simpleListenerPojoLatch = new CountDownLatch(1);
static CountDownLatch pulsarMessageListenerLatch = new CountDownLatch(1);
static CountDownLatch springMessagingMessageListenerLatch = new CountDownLatch(1);

static AtomicReference<String> capturedData = new AtomicReference<>();
static AtomicReference<MessageId> messageId = new AtomicReference<>();
static AtomicReference<String> topicName = new AtomicReference<>();
static AtomicReference<String> fooValue = new AtomicReference<>();
static AtomicReference<Object> pojoValue = new AtomicReference<>();
static AtomicReference<byte[]> rawData = new AtomicReference<>();

@Test
void simpleListenerWithHeaders() throws Exception {
MessageId messageId = pulsarTemplate.newMessage("hello-simple-listener")
var topic = "rplt-simpleListenerWithHeaders";
var msg = "hello-%s".formatted(topic);
MessageId messageId = pulsarTemplate.newMessage(msg)
.withMessageCustomizer(messageBuilder -> messageBuilder.property("foo", "simpleListenerWithHeaders"))
.withTopic("simpleListenerWithHeaders")
.withTopic(topic)
.send();
assertThat(simpleListenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(capturedData.get()).isEqualTo("hello-simple-listener");
assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId);
assertThat(topicName.get()).isEqualTo("persistent://public/default/simpleListenerWithHeaders");
assertThat(fooValue.get()).isEqualTo("simpleListenerWithHeaders");
assertThat(rawData.get()).isEqualTo("hello-simple-listener".getBytes(StandardCharsets.UTF_8));
assertThat(PulsarHeadersTest.messageId).hasValue(messageId);
assertThat(topicName).hasValue("persistent://public/default/%s".formatted(topic));
assertThat(capturedData).hasValue(msg);
assertThat(rawData).hasValue(msg.getBytes(StandardCharsets.UTF_8));
assertThat(fooValue).hasValue("simpleListenerWithHeaders");
}

@Test
void simpleListenerWithPojoHeader() throws Exception {
var topic = "rplt-simpleListenerWithPojoHeader";
var msg = "hello-%s".formatted(topic);
// In order to send complex headers (pojo) must manually map and set each
// header as follows
var user = new UserRecord("that", 100);
var headers = new HashMap<String, Object>();
headers.put("user", user);
var headerMapper = JsonPulsarHeaderMapper.builder().build();
var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers));
MessageId messageId = pulsarTemplate.newMessage(msg)
.withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property))
.withTopic(topic)
.send();
assertThat(simpleListenerPojoLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(PulsarHeadersTest.messageId).hasValue(messageId);
assertThat(topicName).hasValue("persistent://public/default/%s".formatted(topic));
assertThat(pojoValue).hasValue(user);
assertThat(capturedData).hasValue(msg);
assertThat(rawData).hasValue(msg.getBytes(StandardCharsets.UTF_8));
}

@Test
void pulsarMessageListenerWithHeaders() throws Exception {
MessageId messageId = pulsarTemplate.newMessage("hello-pulsar-message-listener")
.withMessageCustomizer(
messageBuilder -> messageBuilder.property("foo", "pulsarMessageListenerWithHeaders"))
.withTopic("pulsarMessageListenerWithHeaders")
.withTopic("rplt-pulsarMessageListenerWithHeaders")
.send();
assertThat(pulsarMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(capturedData.get()).isEqualTo("hello-pulsar-message-listener");
assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId);
assertThat(topicName.get()).isEqualTo("persistent://public/default/pulsarMessageListenerWithHeaders");
assertThat(topicName.get()).isEqualTo("persistent://public/default/rplt-pulsarMessageListenerWithHeaders");
assertThat(fooValue.get()).isEqualTo("pulsarMessageListenerWithHeaders");
assertThat(rawData.get()).isEqualTo("hello-pulsar-message-listener".getBytes(StandardCharsets.UTF_8));
}
Expand All @@ -584,13 +617,13 @@ void springMessagingMessageListenerWithHeaders() throws Exception {
MessageId messageId = pulsarTemplate.newMessage("hello-spring-messaging-message-listener")
.withMessageCustomizer(
messageBuilder -> messageBuilder.property("foo", "springMessagingMessageListenerWithHeaders"))
.withTopic("springMessagingMessageListenerWithHeaders")
.withTopic("rplt-springMessagingMessageListenerWithHeaders")
.send();
assertThat(springMessagingMessageListenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(capturedData.get()).isEqualTo("hello-spring-messaging-message-listener");
assertThat(PulsarHeadersTest.messageId.get()).isEqualTo(messageId);
assertThat(topicName.get())
.isEqualTo("persistent://public/default/springMessagingMessageListenerWithHeaders");
.isEqualTo("persistent://public/default/rplt-springMessagingMessageListenerWithHeaders");
assertThat(fooValue.get()).isEqualTo("springMessagingMessageListenerWithHeaders");
assertThat(rawData.get())
.isEqualTo("hello-spring-messaging-message-listener".getBytes(StandardCharsets.UTF_8));
Expand All @@ -600,8 +633,9 @@ void springMessagingMessageListenerWithHeaders() throws Exception {
@Configuration
static class PulsarListenerWithHeadersConfig {

@ReactivePulsarListener(subscriptionName = "simple-listener-with-headers-sub",
topics = "simpleListenerWithHeaders", consumerCustomizer = "subscriptionInitialPositionEarliest")
@ReactivePulsarListener(topics = "rplt-simpleListenerWithHeaders",
subscriptionName = "rplt-simple-listener-with-headers-sub",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
Expand All @@ -614,8 +648,23 @@ Mono<Void> simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_
return Mono.empty();
}

@ReactivePulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub",
topics = "pulsarMessageListenerWithHeaders",
@ReactivePulsarListener(topics = "rplt-simpleListenerWithPojoHeader",
subscriptionName = "simpleListenerWithPojoHeader-sub",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> simpleListenerWithPojoHeader(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("user") UserRecord user) {
capturedData.set(data);
PulsarHeadersTest.messageId.set(messageId);
PulsarHeadersTest.topicName.set(topicName);
pojoValue.set(user);
PulsarHeadersTest.rawData.set(rawData);
simpleListenerPojoLatch.countDown();
return Mono.empty();
}

@ReactivePulsarListener(subscriptionName = "rplt-pulsar-message-listener-with-headers-sub",
topics = "rplt-pulsarMessageListenerWithHeaders",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> pulsarMessageListenerWithHeaders(Message<String> data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
Expand All @@ -630,8 +679,8 @@ Mono<Void> pulsarMessageListenerWithHeaders(Message<String> data,
return Mono.empty();
}

@ReactivePulsarListener(subscriptionName = "pulsar-message-listener-with-headers-sub",
topics = "springMessagingMessageListenerWithHeaders",
@ReactivePulsarListener(subscriptionName = "rplt-pulsar-message-listener-with-headers-sub",
topics = "rplt-springMessagingMessageListenerWithHeaders",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> springMessagingMessageListenerWithHeaders(org.springframework.messaging.Message<String> data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
Expand All @@ -650,6 +699,62 @@ Mono<Void> springMessagingMessageListenerWithHeaders(org.springframework.messagi

}

@Nested
@ContextConfiguration(classes = PulsarHeadersCustomObjectMapperTestConfig.class)
class PulsarHeadersCustomObjectMapperTest {

private static final String TOPIC = "rplt-listenerWithPojoHeader-custom";

private static final CountDownLatch listenerLatch = new CountDownLatch(1);

private static UserRecord userPassedIntoListener;

@Test
void whenPulsarHeaderObjectMapperIsDefinedThenItIsUsedToDeserializeHeaders() throws Exception {
var msg = "hello-%s".formatted(TOPIC);
// In order to send complex headers (pojo) must manually map and set each
// header as follows
var user = new UserRecord("that", 100);
var headers = new HashMap<String, Object>();
headers.put("user", user);
var headerMapper = JsonPulsarHeaderMapper.builder().build();
var mappedHeaders = headerMapper.toPulsarHeaders(new MessageHeaders(headers));
pulsarTemplate.newMessage(msg)
.withMessageCustomizer(messageBuilder -> mappedHeaders.forEach(messageBuilder::property))
.withTopic(TOPIC)
.send();
// Custom deser adds suffix to name and bumps age + 5
var expectedUser = new UserRecord(user.name() + "-deser", user.age() + 5);
assertThat(listenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(userPassedIntoListener).isEqualTo(expectedUser);
}

@Configuration(proxyBeanMethods = false)
static class PulsarHeadersCustomObjectMapperTestConfig {

@Bean(name = "pulsarHeaderObjectMapper")
ObjectMapper customObjectMapper() {
var objectMapper = new ObjectMapper();
var module = new SimpleModule();
module.addDeserializer(UserRecord.class, new UserRecordDeserializer());
objectMapper.registerModule(module);
return objectMapper;
}

@ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenerWithPojoHeader(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.TOPIC_NAME) String topicName, @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("user") UserRecord user) {
userPassedIntoListener = user;
listenerLatch.countDown();
return Mono.empty();
}

}

}

@Nested
@ContextConfiguration(classes = PulsarListenerConcurrencyTestCases.TestPulsarListenersForConcurrency.class)
class PulsarListenerConcurrencyTestCases {
Expand Down
Loading

0 comments on commit 9c29a49

Please sign in to comment.