Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default-retryable setting is ignored #3066

Open
nataliyachabini opened this issue Jan 6, 2025 · 11 comments
Open

Default-retryable setting is ignored #3066

nataliyachabini opened this issue Jan 6, 2025 · 11 comments
Milestone

Comments

@nataliyachabini
Copy link

nataliyachabini commented Jan 6, 2025

Describe the issue

Even if default-retryable is set to false, the messages are retried. It is the same for retryable-exceptions.

There also seems to be a connection with the transactions, as once the transaction-id-prefix is empty, it works as intended.

To Reproduce

SpringBootApplication

@Configuration
@Slf4j
public class ConsumerConfig {
    @Bean
    public Consumer<String> consumeMessage() {
        return s -> {
            log.info("Consuming {}", s);
            throw new IllegalArgumentException(s);
        };
    }
}

application.xml

  cloud:
    function:
      definition: consumeMessage
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: transaction-
          required-acks: all
          configuration:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
        bindings:
          consumeMessage-in-0:
            consumer:
              enable-dlq: true
      bindings:
        consumeMessage-in-0:
          group: my-group
          destination: my-topic
          consumer:
            default-retryable: false
            max-attempts: 5
            back-off-initial-interval: 100
            retryable-exceptions:
              java.lang.UnsupportedOperationException: true
              java.lang.IllegalArgumentException: false

There is a GitHub project with the minimal setup to reproduce the issue:
https://github.com/DidierLoiseau/kafka-transactions-and-retries/tree/main

When we submit a message on my-topic, it will retry it 5 times despite the default-retryable: false and java.lang.IllegalArgumentException: false.

We have asked a question on StackOverflow but (so far) have not gotten any answers: https://stackoverflow.com/questions/79309828/how-to-configure-retryable-exceptions-for-consumers-when-kafka-transactions-are

Version of the framework
Spring Boot 3.4.0 and Spring Cloud 2024.0.0

Additional context
Diving into the Spring Cloud Stream code, we found that KafkaMessageChannelBinder will set a RetryTemplate configured by buildRetryTemplate(properties) if there is no TransactionManager, but if there is one, it will configure an AfterRollbackProcessor instead, passing it only a BackOff without using the retryable exceptions configuration.

@olegz olegz added the bug label Jan 6, 2025
@olegz olegz added this to the 4.2.1 milestone Jan 6, 2025
@olegz
Copy link
Contributor

olegz commented Jan 6, 2025

Thank you for reporting it. Looking. . .

@olegz
Copy link
Contributor

olegz commented Jan 7, 2025

I am still looking and trying to get clarification from the Kafka team, but it appears that introduction of transaction manager is mutually exclusive to binder retry settings.
Basically if you remove the following

transaction:
            transaction-id-prefix: transaction-
          required-acks: all

. . . then everything works

@nataliyachabini
Copy link
Author

Thank you @olegz, this is indeed what we also have observed, that they cannot be used simultaneously.

@sobychacko
Copy link
Contributor

sobychacko commented Jan 7, 2025

@nataliyachabini Normal binding retry mechanisms are disabled when using transactions. See this note in the ref docs:

Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too. When retries are enabled (the common property maxAttempts is greater than zero) the retry properties are used to configure a DefaultAfterRollbackProcessor to enable retries at the container level. Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the DefaultAfterRollbackProcessor which runs after the main transaction has rolled back.

https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/transactional.html

When transactions are enabled in Spring Cloud Stream Kafka binder, retries are handled differently:

Binder-level retry settings (default-retryable, retryable-exceptions) are bypassed. Instead, retry behavior is controlled by the DefaultAfterRollbackProcessor at the container level.
To disable retries, you must configure the AfterRollbackProcessor directly using the ListenerContainerCustomizer:

public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
    return (container, destination, group) -> container.setAfterRollbackProcessor(
        new DefaultAfterRollbackProcessor<byte[], byte[]>(
            (record, exception) -> log.error("Failed to process: {}", exception.getMessage()),
            new FixedBackOff(0L, 0)  // Disables retries
        )
    );
}

Here is another example matching your retry settings.

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
    return (container, destination, group) -> container.setAfterRollbackProcessor(
        new DefaultAfterRollbackProcessor<byte[], byte[]>(
            (record, exception) -> {
                if (exception instanceof IllegalArgumentException) {
                    log.error("Not retrying IllegalArgumentException: {}", exception.getMessage());
                } 
                else {
                    throw new RuntimeException(exception); // Allow retry for other exceptions
                }
            },
            new ExponentialBackOff(100, 5)) // Matches your original backoff settings
    );
}

Here is a blog that might be related: https://spring.io/blog/2023/10/11/transactional-rollback-strategies-with-spring-cloud-stream-and-apache-kafka

@olegz
Copy link
Contributor

olegz commented Jan 8, 2025

@nataliyachabini So, given the explanation, does it answer your question?

@olegz olegz removed this from the 4.2.1 milestone Jan 8, 2025
@nataliyachabini
Copy link
Author

Yes, thank you for your help @olegz @sobychacko

@DidierLoiseau
Copy link

Thanks @olegz and @sobychacko for the help there, and thanks @nataliyachabini for handling this while I was off.

I noticed however that directly overriding the AfterRollbackProcessor is a bit cumbersome, because it should still deal with submitting the message to the DLQ at the end, like the default implementation provided by the KafkaMessageChannelBinder does.

I noticed however, that the DefaultAfterRollbackProcessor is actually an ExceptionClassifier, so it can directly deal with our retryable-exceptions here!

Moreover, KafkaMessageChannelBinder allows to declare a ListenerContainerWithDlqAndRetryCustomizer (or a KafkaListenerContainerCustomizer since #3015), so it’s even possible to retrieve the extended consumer properties and re-implement the logic of the non-transactional behavior like this:

@Bean
public KafkaListenerContainerCustomizer customizeDefaultAfterRollbackProcessor() {
	log.info("Creating ListenerContainerCustomizer bean");
	return new KafkaListenerContainerCustomizer() {
		@Override
		public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
							  ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
			if (container.getAfterRollbackProcessor() instanceof ExceptionClassifier classifier) {
				if (!extendedConsumerProperties.isDefaultRetryable()) {
					classifier.defaultFalse(true);
				}
				extendedConsumerProperties.getRetryableExceptions()
						.forEach((t, retry) -> {
							if (Exception.class.isAssignableFrom(t)) {
								var ex = t.asSubclass(Exception.class);
								if (retry) {
									classifier.addRetryableExceptions(ex);
								} else {
									classifier.addNotRetryableExceptions(ex);
								}
							}
						});
			}
		}

		@Override
		public void configure(AbstractMessageListenerContainer<?, ?> container, String destination, String group) {
		}
	};
}

(Note that it is not possible to implement it with a lambda because the new method is default and not the one inherited from ListenerContainerCustomizerthe documentation thus seems incorrect)

Wouldn’t it make sense to apply this by default when creating the DefaultAfterRollbackProcessor in KafkaMessageChannelBinder?

DidierLoiseau added a commit to DidierLoiseau/kafka-transactions-and-retries that referenced this issue Feb 3, 2025
@DidierLoiseau
Copy link

Hey @sobychacko, should I create a separate issue for applying the retryable-exceptions and default-retryable by default on the DefaultAfterRollbackProcessor that is created in KafkaMessageChannelBinder?

It wouldn’t be as flexible as a RetryTemplate (which can be provided in case of non-transactional configuration) but I think it would already be an improvement. (The main issue being that the AfterRollbackProcessor cannot be customized easily without re-implementing the anonymous class defined in KafkaMessageChannelBinder)

@sobychacko
Copy link
Contributor

@DidierLoiseau A Separate issue is not necessary. I will re-open this issue, and we can address it using it. We will get back to you soon on this. Thanks!

@sobychacko sobychacko reopened this Feb 4, 2025
@sobychacko
Copy link
Contributor

@DidierLoiseau, Are you open to sending a PR with these changes? If so, please give it a try. Either way, let us know. Thanks!

@sobychacko sobychacko added this to the 4.3.0 milestone Feb 25, 2025
DidierLoiseau added a commit to DidierLoiseau/spring-cloud-stream that referenced this issue Mar 1, 2025
DidierLoiseau added a commit to DidierLoiseau/spring-cloud-stream that referenced this issue Mar 2, 2025
@DidierLoiseau
Copy link

@sobychacko done with #3089 🙂

DidierLoiseau added a commit to DidierLoiseau/spring-cloud-stream that referenced this issue Mar 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants