From bf33c9f28055c4d85394ef557265083ecfadfa9e Mon Sep 17 00:00:00 2001 From: Taeik Lim Date: Thu, 26 Oct 2023 23:36:36 +0900 Subject: [PATCH] Fix broken example code 'retry(When' -> 'retryWhen' --- src/docs/asciidoc/api-guide.adoc | 4 ++-- src/docs/asciidoc/examples.adoc | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/docs/asciidoc/api-guide.adoc b/src/docs/asciidoc/api-guide.adoc index 2bd162c5..0702142a 100644 --- a/src/docs/asciidoc/api-guide.adoc +++ b/src/docs/asciidoc/api-guide.adoc @@ -334,8 +334,8 @@ which will ensure that a new consumer is created: -------- Flux> inboundFlux = KafkaReceiver.create(receiverOptions) - .receive() - .retry(When(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS)))); + .receive() + .retryWhen(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS))); -------- Any errors related to the event processing rather than the `KafkaConsumer` itself should be handled as close to the source as possible diff --git a/src/docs/asciidoc/examples.adoc b/src/docs/asciidoc/examples.adoc index 93ec755f..9793082e 100644 --- a/src/docs/asciidoc/examples.adoc +++ b/src/docs/asciidoc/examples.adoc @@ -72,7 +72,7 @@ KafkaSender.create(senderOptions) .send(source.flux().map(r -> transform(r))) // <4> .doOnError(e-> log.error("Send failed, terminating.", e)) // <5> .doOnNext(r -> source.commit(r.correlationMetadata())) // <6> - .retry(When(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS)))); + .retryWhen(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS))); -------- <1> Send is acknowledged by Kafka for acks=all after message is delivered to all in-sync replicas <2> Large number of retries in the producer to cope with transient failures in brokers @@ -100,7 +100,7 @@ KafkaReceiver.create(receiverOptions) .publishOn(aBoundedElasticScheduler) // <4> .concatMap(m -> sink.store(transform(m)) // <5> .doOnSuccess(r -> m.receiverOffset().commit().block())) // <6> - .retry(When(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS)))) + .retryWhen(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS))); -------- <1> Disable periodic commits <2> Disable commits by batch size