-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Default-retryable setting is ignored #3066
Comments
Thank you for reporting it. Looking. . . |
I am still looking and trying to get clarification from the Kafka team, but it appears that introduction of transaction manager is mutually exclusive to binder retry settings.
. . . then everything works |
Thank you @olegz, this is indeed what we also have observed, that they cannot be used simultaneously. |
@nataliyachabini Normal binding retry mechanisms are disabled when using transactions. See this note in the ref docs:
https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/transactional.html When transactions are enabled in Spring Cloud Stream Kafka binder, retries are handled differently: Binder-level retry settings (default-retryable, retryable-exceptions) are bypassed. Instead, retry behavior is controlled by the
Here is another example matching your retry settings.
Here is a blog that might be related: https://spring.io/blog/2023/10/11/transactional-rollback-strategies-with-spring-cloud-stream-and-apache-kafka |
@nataliyachabini So, given the explanation, does it answer your question? |
Yes, thank you for your help @olegz @sobychacko |
Thanks @olegz and @sobychacko for the help there, and thanks @nataliyachabini for handling this while I was off. I noticed however that directly overriding the I noticed however, that the Moreover, @Bean
public KafkaListenerContainerCustomizer customizeDefaultAfterRollbackProcessor() {
log.info("Creating ListenerContainerCustomizer bean");
return new KafkaListenerContainerCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
if (container.getAfterRollbackProcessor() instanceof ExceptionClassifier classifier) {
if (!extendedConsumerProperties.isDefaultRetryable()) {
classifier.defaultFalse(true);
}
extendedConsumerProperties.getRetryableExceptions()
.forEach((t, retry) -> {
if (Exception.class.isAssignableFrom(t)) {
var ex = t.asSubclass(Exception.class);
if (retry) {
classifier.addRetryableExceptions(ex);
} else {
classifier.addNotRetryableExceptions(ex);
}
}
});
}
}
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destination, String group) {
}
};
} (Note that it is not possible to implement it with a lambda because the new method is Wouldn’t it make sense to apply this by default when creating the |
Hey @sobychacko, should I create a separate issue for applying the It wouldn’t be as flexible as a |
@DidierLoiseau A Separate issue is not necessary. I will re-open this issue, and we can address it using it. We will get back to you soon on this. Thanks! |
@DidierLoiseau, Are you open to sending a PR with these changes? If so, please give it a try. Either way, let us know. Thanks! |
…her with transactions
…her with transactions
@sobychacko done with #3089 🙂 |
…her with transactions
Describe the issue
Even if
default-retryable
is set to false, the messages are retried. It is the same forretryable-exceptions
.There also seems to be a connection with the transactions, as once the
transaction-id-prefix
is empty, it works as intended.To Reproduce
SpringBootApplication
application.xml
There is a GitHub project with the minimal setup to reproduce the issue:
https://github.com/DidierLoiseau/kafka-transactions-and-retries/tree/main
When we submit a message on my-topic, it will retry it 5 times despite the default-retryable: false and java.lang.IllegalArgumentException: false.
We have asked a question on StackOverflow but (so far) have not gotten any answers: https://stackoverflow.com/questions/79309828/how-to-configure-retryable-exceptions-for-consumers-when-kafka-transactions-are
Version of the framework
Spring Boot 3.4.0 and Spring Cloud 2024.0.0
Additional context
Diving into the Spring Cloud Stream code, we found that KafkaMessageChannelBinder will set a RetryTemplate configured by buildRetryTemplate(properties) if there is no TransactionManager, but if there is one, it will configure an AfterRollbackProcessor instead, passing it only a BackOff without using the retryable exceptions configuration.
The text was updated successfully, but these errors were encountered: