Spring Pulsar Message Key #1024
Replies: 3 comments 4 replies
-
Hi @horiaradu1
You are more than welcome. And thank you for using the framework and for opening the discussion. Good sleuthing on why the headers are not visible in the binder case (the mapper is dropping them). This works in the non-binder scenarios because no mapper is involved. Typically, you would need to provide a custom header mapper. The default mapper can be seen here (uses Jackson if available on the classpath otherwise just defaults to However, unfortunately, the "never" headers are not currently configurable via the Jackson or ToString constructors. You could create a custom mapper class and override the class CustomHeaderMapper extends JsonPulsarHeaderMapper {
CustomHeaderMapper(ObjectMapper objectMapper, List<String> inboundPatterns, List<String> outboundPatterns, Set<String> trustedPackages, Set<String> toStringClasses) {
super(objectMapper, inboundPatterns, outboundPatterns, trustedPackages, toStringClasses);
}
@Override
protected boolean matchesForOutbound(String header) {
return PulsarHeaders.KEY.equals(header) || super.matchesForOutbound(header);
}
} We will try to get a fix in that allows the "never" headers to be configured via constructors. Hopefully it will be in the 1.2.3 release on 02/18. Thanks, |
Beta Was this translation helpful? Give feedback.
-
@horiaradu1 I created #1037 to track this. If you are satisfied w/ the above response and the newly created issue I will close this discussion. Thanks |
Beta Was this translation helpful? Give feedback.
-
Hello @onobc I have done exactly that, created a I have noticed though that with this method, while the property is sent and I can get and parse the value, the key is not recognised as a @Override
public MessageHeaders toSpringHeaders(Message<?> pulsarMessage) {
Objects.requireNonNull(pulsarMessage, "pulsarMessage must not be null");
var context = toSpringHeadersOnStarted(pulsarMessage);
var headersMap = new LinkedHashMap<String, Object>();
// custom user properties (headers)
pulsarMessage.getProperties().forEach((name, value) -> {
if (matchesForInbound(name)) {
var valueToUse = toSpringHeaderValue(name, value, context);
headersMap.put(name, valueToUse);
}
});
// built-in Pulsar metadata headers
if (pulsarMessage.hasKey()) {
addToHeadersMapIfAllowed(PulsarHeaders.KEY, pulsarMessage::getKey, headersMap::put);
addToHeadersMapIfAllowed(PulsarHeaders.KEY_BYTES, pulsarMessage::getKeyBytes, headersMap::put);
}
... What I did instead in the binder case was create a @Override
protected void handleMessageInternal(Message<?> message) {
try {
// @formatter:off
this.pulsarTemplate.newMessage(message.getPayload())
.withTopic(this.destination)
.withSchema(this.schema)
.withProducerCustomizer(this.layeredProducerPropsCustomizer)
.withMessageCustomizer(this.applySpringHeadersAsPulsarProperties(message.getHeaders()))
// CUSTOM added message customizer for message key
.withMessageCustomizer(this.applySpringHeaderAsPulsarKey(message.getHeaders()))
//
.sendAsync();
// @formatter:on
}
catch (Exception ex) {
logger.trace(ex, "Failed to send message to destination: " + this.destination);
}
}
private TypedMessageBuilderCustomizer<Object> applySpringHeaderAsPulsarKey(MessageHeaders headers) {
return (mb) -> {
if (this.headerMapper != null && headers.containsKey(PulsarHeaders.KEY)) {
Optional.ofNullable(headers.get(PulsarHeaders.KEY)).ifPresent(k -> mb.key(k.toString()));
}
};
}
... I added this Would this also be a good solution to mapping keys to the Pulsar Message, instead of as a property? Thank you, |
Beta Was this translation helpful? Give feedback.
-
Hello
I have a question about producing messages with headers on Pulsar.
I have tried it 2 separate ways, with a PulsarTemplate and a Pulsar Producer Client:
Both of them sent the payload correctly, with a header named "pulsar_message_key", which had the value of the key I had given it.
I then tried sending the key, using the binder (I have a setup similar to sample-pulsar-binder).
But the "pulsar_message_key" header gets removed by the
outBoundMatchers
in AbstractPulsarHeaderMapper.java, those internal framework headers being in the NeverMatch list.Is there a way that I am missing to set up the key (
pulsar_message_key
) using the binder method, maybe set them up in the binder producer configuration?Or am I not correctly understanding how those internal headers should be set up and work?
Thank you all for the work you have done on this project, and please let me know if I missed any details.
Beta Was this translation helpful? Give feedback.
All reactions