Skip to content

Commit

Permalink
Allow custom object mapper for messages
Browse files Browse the repository at this point in the history
This commit adds support for a user-provided Jackson ObjectMapper to be
used when de/serializing JSON messages.

Additionally, adds Gradle test fixtures to the spring-pulsar module and
deprecates the UserRecord and UserPojo in spring-pulsar-test in favor
of their equivalent in the test fixture.

See spring-projects#723
  • Loading branch information
onobc committed Jul 29, 2024
1 parent 3fcb22d commit 0aea20a
Show file tree
Hide file tree
Showing 32 changed files with 864 additions and 50 deletions.
1 change: 1 addition & 0 deletions spring-pulsar-docs/src/main/antora/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*** xref:reference/reactive-pulsar/reactive-message-consumption.adoc[]
*** xref:reference/tombstones-reactive.adoc[]
** xref:reference/topic-resolution.adoc[]
** xref:reference/custom-object-mapper.adoc[]
** xref:reference/pulsar-admin.adoc[]
** xref:reference/pulsar-function.adoc[]
** xref:reference/observability.adoc[]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[[custom-object-mapper]]
= Custom Object Mapper
include::../attributes/attributes.adoc[]

Pulsar uses an internal Jackson `ObjectMapper` when de/serializing JSON messages.
If you instead want to provide your own object mapper instance, you can register a `SchemaResolverCustomizer` and set your mapper on the `DefaultSchemaResolver` as follows:

[source,java,indent=0,subs="verbatim"]
----
@Bean
SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (DefaultSchemaResolver schemaResolver) -> {
var myObjectMapper = obtainMyObjectMapper();
schemaResolver.setObjectMapper(myObjectMapper);
};
}
----

This results in your object mapper being used to de/serialize all JSON messages that go through the schema resolution process (i.e. in cases where you do not pass a schema in directly when producing/consuming messages).

Under the hood, the resolver creates a special JSON schema which leverages the custom mapper and is used as the schema for all resolved JSON messages.

If you need to pass schema instances directly you can use the `JSONSchemaUtil` to create schemas that respect the custom mapper.
The following example shows how to do this when sending a message with the `PulsarTemplate` variant that takes a schema parameter:

[source,java,indent=0,subs="verbatim"]
----
void sendMessage(PulsarTemplate<MyPojo> template, MyPojo toSend) {
var myObjectMapper = obtainMyObjectMapper();
var schema = JSONSchemaUtil.schemaForTypeWithObjectMapper(MyPojo.class, myObjectMapper);
template.send(toSend, schema);
}
----


[CAUTION]
====
Pulsar configures its default object mapper in a particular way.
Unless you have a specific reason to not do so, it is highly recommended that you configure your mapper with these same options as follows:
[source,java,indent=0,subs="verbatim"]
----
myObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
myObjectMapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, false);
myObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
----
====
NOTE: A later version of the framework may instead provide a customizer that operates on the default mapper rather than requiring a separate instance.
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
= What's new?

[[what-s-new-in-1-2-since-1-1]]
== What's New in 1.2 Since 1.1
:page-section-summary-toc: 1

This section covers the changes made from version 1.1 to version 1.2.

=== Custom Object Mapper
You can provide your own Jackson `ObjectMapper` that Pulsar will use when producing and consuming JSON messages.
See xref:./reference/custom-object-mapper.adoc[Custom Object Mapper] for more details.

[[what-s-new-in-1-1-since-1-0]]
== What's New in 1.1 Since 1.0
:page-section-summary-toc: 1
Expand Down
1 change: 1 addition & 0 deletions spring-pulsar-reactive/spring-pulsar-reactive.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
optional libs.json.path

testImplementation project(':spring-pulsar-test')
testImplementation(testFixtures(project(":spring-pulsar")))
testRuntimeOnly libs.logback.classic
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.assertj:assertj-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.JSONSchemaUtil;
import org.springframework.pulsar.test.model.UserRecord;
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.pulsar.test.support.model.UserRecord;
import org.springframework.util.function.ThrowingConsumer;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -332,6 +334,25 @@ void withJsonSchema() throws Exception {

}

@Nested
class CustomObjectMapperTests {

@Test
void sendWithCustomJsonSchema() throws Exception {
// Prepare the schema with custom object mapper
var objectMapper = UserRecordObjectMapper.withSer();
var schema = JSONSchemaUtil.schemaForTypeWithObjectMapper(UserRecord.class, objectMapper);
var topic = "rptt-custom-object-mapper-topic";
var user = new UserRecord("elFoo", 21);
// serializer adds '-ser' to name and 10 to age
var expectedUser = new UserRecord("elFoo-ser", 31);
ThrowingConsumer<ReactivePulsarTemplate<UserRecord>> sendFunction = (
template) -> template.send(topic, user, schema).subscribe();
sendAndConsume(sendFunction, topic, schema, expectedUser, false);
}

}

public static class Foo {

private String foo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -37,10 +38,13 @@
import org.junit.jupiter.api.Test;

import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.core.JSONSchemaUtil;
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.pulsar.test.model.UserRecord;
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;

import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -306,28 +310,69 @@ void deadLetterTopicCustomizer() throws Exception {
}
}

@Test
void oneByOneMessageHandlerWithCustomObjectMapper() throws Exception {
var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build();
ReactivePulsarMessageListenerContainer<UserRecord> container = null;
try {
// Prepare the schema with custom object mapper
var objectMapper = UserRecordObjectMapper.withDeser();
var schema = JSONSchemaUtil.schemaForTypeWithObjectMapper(UserRecord.class, objectMapper);

var reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
var topic = topicNameForTest("com-topic");
var consumerFactory = createAndPrepareConsumerFactory(topic, schema, reactivePulsarClient);
var containerProperties = new ReactivePulsarContainerProperties<UserRecord>();
containerProperties.setSchema(schema);
var latch = new CountDownLatch(1);
AtomicReference<UserRecord> consumedRecordRef = new AtomicReference<>();
containerProperties.setMessageHandler((ReactivePulsarOneByOneMessageHandler<UserRecord>) (msg) -> {
consumedRecordRef.set(msg.getValue());
return Mono.fromRunnable(latch::countDown);
});
container = new DefaultReactivePulsarMessageListenerContainer<>(consumerFactory, containerProperties);
container.start();

var sentUserRecord = new UserRecord("person", 51);
// deser adds '-deser' to name and 5 to age
var expectedConsumedUser = new UserRecord("person-deser", 56);
createPulsarTemplate(topic, reactivePulsarClient).send(sentUserRecord).subscribe();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(consumedRecordRef).hasValue(expectedConsumedUser);
}
finally {
safeStopContainer(container);
pulsarClient.close();
}
}

private String topicNameForTest(String suffix) {
return "drpmlct-" + suffix;
}

private DefaultReactivePulsarConsumerFactory<String> createAndPrepareConsumerFactory(String topic,
ReactivePulsarClient reactivePulsarClient) {
ReactiveMessageConsumerBuilderCustomizer<String> defaultConfig = (builder) -> {
return this.createAndPrepareConsumerFactory(topic, Schema.STRING, reactivePulsarClient);
}

private <T> DefaultReactivePulsarConsumerFactory<T> createAndPrepareConsumerFactory(String topic, Schema<T> schema,
ReactivePulsarClient reactivePulsarClient) {
ReactiveMessageConsumerBuilderCustomizer<T> defaultConfig = (builder) -> {
builder.topic(topic);
builder.subscriptionName(topic + "-sub");
};
var consumerFactory = new DefaultReactivePulsarConsumerFactory<>(reactivePulsarClient, List.of(defaultConfig));
var consumerFactory = new DefaultReactivePulsarConsumerFactory<T>(reactivePulsarClient, List.of(defaultConfig));
// Ensure subscription is created
consumerFactory.createConsumer(Schema.STRING).consumeNothing().block(Duration.ofSeconds(5));
consumerFactory.createConsumer(schema).consumeNothing().block(Duration.ofSeconds(5));
return consumerFactory;
}

private ReactivePulsarTemplate<String> createPulsarTemplate(String topic,
private <T> ReactivePulsarTemplate<T> createPulsarTemplate(String topic,
ReactivePulsarClient reactivePulsarClient) {
var producerFactory = DefaultReactivePulsarSenderFactory.<String>builderFor(reactivePulsarClient)
var producerFactory = DefaultReactivePulsarSenderFactory.<T>builderFor(reactivePulsarClient)
.withDefaultTopic(topic)
.build();
return new ReactivePulsarTemplate<>(producerFactory);
return new ReactivePulsarTemplate<T>(producerFactory);
}

private void safeStopContainer(ReactivePulsarMessageListenerContainer<?> container) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerAutoConsumeSchemaTests.ReactivePulsarListenerAutoConsumeSchemaTestsConfig;
import org.springframework.pulsar.test.support.model.UserPojo;
import org.springframework.pulsar.test.support.model.UserRecord;
import org.springframework.pulsar.test.model.UserPojo;
import org.springframework.pulsar.test.model.UserRecord;
import org.springframework.test.context.ContextConfiguration;

import reactor.core.publisher.Mono;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
import org.springframework.pulsar.reactive.support.MessageUtils;
import org.springframework.pulsar.support.PulsarHeaders;
import org.springframework.pulsar.test.support.model.UserPojo;
import org.springframework.pulsar.test.support.model.UserRecord;
import org.springframework.pulsar.test.model.UserPojo;
import org.springframework.pulsar.test.model.UserRecord;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ ext['pulsar.version'] = "${pulsarVersion}"
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar'
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
testImplementation project(':spring-pulsar-test')
// temporary until JsonSchemaUtil published
implementation project(':spring-pulsar')
implementation(testFixtures(project(":spring-pulsar")))
implementation project(':spring-pulsar-test')
testRuntimeOnly 'ch.qos.logback:logback-classic'
testImplementation "org.springframework.boot:spring-boot-starter-test"
testImplementation "org.springframework.boot:spring-boot-testcontainers"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.test.model.UserRecord;
import org.springframework.pulsar.test.model.json.UserRecordObjectMapper;

@SpringBootApplication
public class ImperativeProduceAndConsumeApp {
Expand All @@ -55,7 +59,7 @@ ApplicationRunner sendPrimitiveMessagesToPulsarTopic(PulsarTemplate<String> temp
};
}

@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub")
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub")
void consumePrimitiveMessagesFromPulsarTopic(String msg) {
LOG.info("++++++CONSUME {}------", msg);
}
Expand All @@ -79,7 +83,7 @@ ApplicationRunner sendComplexMessagesToPulsarTopic(PulsarTemplate<Foo> template)
};
}

@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub")
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub")
void consumeComplexMessagesFromPulsarTopic(Foo msg) {
LOG.info("++++++CONSUME {}------", msg);
}
Expand Down Expand Up @@ -108,7 +112,7 @@ ApplicationRunner sendPartitionedMessagesToPulsarTopic(PulsarTemplate<String> te
};
}

@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub")
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub")
void consumePartitionedMessagesFromPulsarTopic(String msg) {
LOG.info("++++++CONSUME {}------", msg);
}
Expand All @@ -132,7 +136,7 @@ ApplicationRunner sendBatchMessagesToPulsarTopic(PulsarTemplate<Foo> template) {
};
}

@PulsarListener(topics = TOPIC, subscriptionName = TOPIC+"-sub", batch = true)
@PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub", batch = true)
void consumeBatchMessagesFromPulsarTopic(List<Foo> messages) {
messages.forEach((msg) -> LOG.info("++++++CONSUME {}------", msg));
}
Expand Down Expand Up @@ -162,6 +166,38 @@ void consumeBarWithoutTopicOrSchema(Bar msg) {

}

@Configuration(proxyBeanMethods = false)
static class ProduceConsumeCustomObjectMapper {

private static final String TOPIC = "produce-consume-custom-object-mapper";

@Bean
SchemaResolver.SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (DefaultSchemaResolver schemaResolver) -> {
var objectMapper = UserRecordObjectMapper.withSerAndDeser();
schemaResolver.setObjectMapper(objectMapper);
};
}

@Bean
ApplicationRunner sendWithCustomObjectMapper(PulsarTemplate<UserRecord> template) {
return (args) -> {
for (int i = 0; i < 10; i++) {
var user = new UserRecord("user-" + i, 30);
template.send(TOPIC, user);
LOG.info("++++++PRODUCE {}------", user);
}
};
}

@PulsarListener(topics = TOPIC)
void consumeWithCustomObjectMapper(UserRecord user) {
LOG.info("++++++CONSUME {}------", user);
}

}


record Foo(String name, Integer value) {
}

Expand Down
Loading

0 comments on commit 0aea20a

Please sign in to comment.