diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java
index 0cb2da1a3a..943f13156c 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java
@@ -15,6 +15,7 @@
*/
package reactor.netty.http.client;
+import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
@@ -25,6 +26,7 @@
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import io.netty.buffer.ByteBuf;
@@ -49,6 +51,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
+import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.Http2SettingsSpec;
import reactor.netty.http.HttpProtocol;
@@ -60,6 +63,7 @@
import reactor.netty.transport.ClientTransport;
import reactor.util.Metrics;
import reactor.util.annotation.Nullable;
+import reactor.util.retry.RetrySpec;
/**
* An HttpClient allows building in a safe immutable way an http client that is
@@ -634,11 +638,11 @@ public final RequestSender delete() {
}
/**
- * Option to disable {@code retry once} support for the outgoing requests that fail with
- * {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}.
- *
By default this is set to false in which case {@code retry once} is enabled.
+ * Option to disable {@code request retry} established by {@link #requestRetry(RetrySpec)}.
+ * See its doc for any default retry behavior!
+ *
By default this is set to false.
*
- * @param disableRetry true to disable {@code retry once}, false to enable it
+ * @param disableRetry true to disable {@code request retry}, false to enable it
*
* @return a new {@link HttpClient}
* @since 0.9.6
@@ -652,6 +656,21 @@ public final HttpClient disableRetry(boolean disableRetry) {
return dup;
}
+ /**
+ * Option to customize {@code request retry} behavior. If any HTTP request data
+ * (headers, body, etc.) is sent, the request will not be retried regardless of
+ * this configuration. This can be disabled via {@link #disableRetry(boolean)}.
+ *
+ *
This defaults to {@code retry once} for outgoing requests that fail with
+ * {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}.
+ */
+ public final HttpClient requestRetry(final RetrySpec requestRetry) {
+ Objects.requireNonNull(requestRetry, "requestRetry");
+ HttpClient dup = duplicate();
+ dup.configuration().requestRetrySpec = requestRetry;
+ return dup;
+ }
+
/**
* Setup a callback called when {@link HttpClientRequest} has been sent
* and {@link HttpClientState#REQUEST_SENT} has been emitted.
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java
index 062517539d..515996372a 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java
@@ -68,6 +68,7 @@
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
+import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.Http2SettingsSpec;
@@ -83,6 +84,8 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetrySpec;
import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.Http2ConnectionProvider.OWNER;
@@ -314,7 +317,11 @@ public WebsocketClientSpec websocketClientSpec() {
BiConsumer redirectRequestBiConsumer;
Consumer redirectRequestConsumer;
Duration responseTimeout;
+
+ RetrySpec requestRetrySpec;
+
boolean retryDisabled;
+
SslProvider sslProvider;
URI uri;
String uriStr;
@@ -332,7 +339,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.method = HttpMethod.GET;
this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11};
this._protocols = h11;
- this.retryDisabled = false;
+ this.requestRetrySpec = Retry.max(1).filter(AbortedException::isConnectionReset);
}
HttpClientConfig(HttpClientConfig parent) {
@@ -361,6 +368,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.redirectRequestBiConsumer = parent.redirectRequestBiConsumer;
this.redirectRequestConsumer = parent.redirectRequestConsumer;
this.responseTimeout = parent.responseTimeout;
+ this.requestRetrySpec = parent.requestRetrySpec;
this.retryDisabled = parent.retryDisabled;
this.sslProvider = parent.sslProvider;
this.uri = parent.uri;
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java
index 615374ed6a..094f73b875 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java
@@ -54,7 +54,6 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
-import reactor.netty.channel.AbortedException;
import reactor.netty.http.HttpOperations;
import reactor.netty.http.HttpProtocol;
import reactor.netty.resources.ConnectionProvider;
@@ -67,6 +66,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
+import reactor.util.retry.RetrySpec;
import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED;
@@ -208,7 +208,7 @@ static final class MonoHttpConnect extends Mono {
public void subscribe(CoreSubscriber super Connection> actual) {
HttpClientHandler handler = new HttpClientHandler(config);
- Mono.create(sink -> {
+ final Mono baseMono = Mono.create(sink -> {
HttpClientConfig _config = config;
//append secure handler if needed
@@ -269,8 +269,48 @@ public void subscribe(CoreSubscriber super Connection> actual) {
.acquire(_config, observer, handler, resolver)
.subscribe(new ClientTransportSubscriber(sink));
- }).retryWhen(Retry.indefinitely().filter(handler))
- .subscribe(actual);
+ }).retryWhen(Retry.indefinitely().filter(err -> {
+ if (err instanceof RedirectClientException) {
+ RedirectClientException re = (RedirectClientException)err;
+ if (HttpResponseStatus.SEE_OTHER.equals(re.status)) {
+ handler.method = HttpMethod.GET;
+ }
+ handler.redirect(re.location);
+ return true;
+ }
+ return false;
+ }));
+
+ // If request retry is enabled, the handler should guarantee no request data sent
+ final Mono finalMono = config.retryDisabled ? baseMono :
+ baseMono.retryWhen(new MyRetry(config.requestRetrySpec.modifyErrorFilter(handler::and), handler));
+
+ finalMono.subscribe(actual);
+ }
+
+ // TODO If (and this is a big if) this stays, then do a better job naming. Merging this with
+ // the handler in some way wouldn't be far off from how a lot of this project is designed..
+ static class MyRetry extends Retry {
+ private final Retry delegate;
+ private final HttpClientHandler handler;
+
+ MyRetry(final Retry delegate, final HttpClientHandler handler) {
+ this.delegate = delegate;
+ this.handler = handler;
+ }
+
+ @Override
+ public Publisher> generateCompanion(Flux retrySignals) {
+ return Flux.from(delegate.generateCompanion(retrySignals))
+ .map(i -> {
+ handler.retrying = true;
+ return i;
+ })
+ .onErrorMap(err -> true, err -> {
+ handler.retrying = false;
+ return err;
+ });
+ }
}
private void removeIncompatibleProtocol(HttpClientConfig config, HttpProtocol protocol) {
@@ -347,9 +387,9 @@ public void onUncaughtException(Connection connection, Throwable error) {
handler.previousRequestHeaders = ops.requestHeaders;
}
}
- else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) {
+ else if (handler.requestRetrySpec.errorFilter.test(error)) {
HttpClientOperations ops = connection.as(HttpClientOperations.class);
- if (ops != null && ops.hasSentHeaders()) {
+ if (ops != null) {
// In some cases the channel close event may be delayed and thus the connection to be
// returned to the pool and later the eviction functionality to remote it from the pool.
// In some rare cases the connection might be acquired immediately, before the channel close
@@ -358,29 +398,15 @@ else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) {
// Mark the connection as non-persistent here so that it is never returned to the pool and leave
// the channel close event to invalidate it.
ops.markPersistent(false);
- // Disable retry if the headers or/and the body were sent
- handler.shouldRetry = false;
- if (log.isWarnEnabled()) {
- log.warn(format(connection.channel(),
- "The connection observed an error, the request cannot be " +
- "retried as the headers/body were sent"), error);
- }
- }
- else {
- if (ops != null) {
- // In some cases the channel close event may be delayed and thus the connection to be
- // returned to the pool and later the eviction functionality to remote it from the pool.
- // In some rare cases the connection might be acquired immediately, before the channel close
- // event and the eviction functionality be able to remove it from the pool, this may lead to I/O
- // errors.
- // Mark the connection as non-persistent here so that it is never returned to the pool and leave
- // the channel close event to invalidate it.
- ops.markPersistent(false);
- ops.retrying = true;
- }
- if (log.isDebugEnabled()) {
- log.debug(format(connection.channel(),
- "The connection observed an error, the request will be retried"), error);
+
+ if (ops.hasSentHeaders()) {
+ // Signal to retry that headers or/and the body were sent
+ handler.requestDataSent = true;
+ if (log.isWarnEnabled()) {
+ log.warn(format(connection.channel(),
+ "The connection observed an error, the request cannot be " +
+ "retried as the headers/body were sent"), error);
+ }
}
}
}
@@ -397,7 +423,26 @@ else if (error instanceof SslClosedEngineException) {
else if (log.isWarnEnabled()) {
log.warn(format(connection.channel(), "The connection observed an error"), error);
}
+ // TODO So this is the underpinning that makes the current solution work, and I'm very concerned about it.
+ // Basically, handler.retrying is set to true/false in a custom Retry#generateCompanion, which is
+ // ultimately tied to the Mono associated with the sink. I don't know if there are any guarantees
+ // that lambda will fire synchronously. I could get around this by using a communication channel
+ // (blocking queue, mono, etc), but that is all kinds of icky and possibly deadlock prone.
+ //
+ // Ultimately, the retry state prior to this change was being managed inside these Observers. With
+ // that being pushed to a customizable retry config, it's not immediately clear how to get the fact
+ // that a retry is or is not happening back to these observers. My first attempt at solving this
+ // problem revolved around throwing a custom exception from a failed retry and using that as an indication
+ // that a retry was not happening. However, that has some negatives as well (and I'm not sure it
+ // was going to work).
+ //
+ // Anyhow, neither feels clean. Still trying to think of another way, but with the 30 minute increments
+ // I've been giving to this problem, I haven't come up with something that feels clean:(
sink.error(error);
+ HttpClientOperations ops = connection.as(HttpClientOperations.class);
+ if (ops != null) {
+ ops.retrying = handler.retrying;
+ }
}
@Override
@@ -468,10 +513,21 @@ static final class HttpClientHandler extends SocketAddress
volatile String resourceUrl;
volatile UriEndpoint fromURI;
volatile Supplier[] redirectedFrom;
- volatile boolean shouldRetry;
+
+ /**
+ * A {@link RetrySpec} that is tied to request submission. The implementation
+ * that leverages this is supposed to guarantee that no retry will happen if
+ * any HTTP request data is sent over the wire.
+ */
+ final RetrySpec requestRetrySpec;
+
+ volatile boolean requestDataSent;
+
+ volatile boolean retrying;
+
volatile HttpHeaders previousRequestHeaders;
- HttpClientHandler(HttpClientConfig configuration) {
+ HttpClientHandler(final HttpClientConfig configuration) {
this.method = configuration.method;
this.compress = configuration.acceptGzip;
this.followRedirectPredicate = configuration.followRedirectPredicate;
@@ -488,7 +544,7 @@ static final class HttpClientHandler extends SocketAddress
new UriEndpointFactory(configuration.remoteAddress(), configuration.isSecure(), URI_ADDRESS_MAPPER);
this.websocketClientSpec = configuration.websocketClientSpec;
- this.shouldRetry = !configuration.retryDisabled;
+ this.requestRetrySpec = configuration.requestRetrySpec;
this.handler = configuration.body;
if (configuration.uri == null) {
@@ -674,16 +730,7 @@ void channel(HttpClientOperations ops) {
@Override
public boolean test(Throwable throwable) {
- if (throwable instanceof RedirectClientException) {
- RedirectClientException re = (RedirectClientException) throwable;
- if (HttpResponseStatus.SEE_OTHER.equals(re.status)) {
- method = HttpMethod.GET;
- }
- redirect(re.location);
- return true;
- }
- if (shouldRetry && AbortedException.isConnectionReset(throwable)) {
- shouldRetry = false;
+ if (!requestDataSent) {
redirect(toURI.toString());
return true;
}
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java
index 9577c82f21..87fea0bea9 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java
@@ -124,7 +124,6 @@ class HttpClientOperations extends HttpOperations
HttpClientOperations(HttpClientOperations replaced) {
super(replaced);
this.started = replaced.started;
- this.retrying = replaced.retrying;
this.redirecting = replaced.redirecting;
this.redirectedFrom = replaced.redirectedFrom;
this.redirectRequestConsumer = replaced.redirectRequestConsumer;
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
index b3e4f66b80..23ce376fb1 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
@@ -1323,18 +1323,30 @@ private void doTestRetry(boolean retryDisabled, boolean expectRetry) throws Exce
})
.responseContent())
.expectErrorMatches(t -> {
- error.set(t);
- return t.getMessage() != null &&
- (t.getMessage().contains("Connection reset by peer") ||
- t.getMessage().contains("Connection reset") ||
- t.getMessage().contains("readAddress(..)") || // https://github.com/reactor/reactor-netty/issues/1673
- t.getMessage().contains("Connection prematurely closed BEFORE response"));
+ error.set(t);
+ // TODO Is it considered a contract change to return the retry exhausted wrapper?
+ // I'm thinking it is, but it does seem nice to indicate a retry happened and failed
+ // Will need reactor-netty folks to decide on this. If I hear nothing from them
+ // before moving out of draft, change things to preserve original behavior.
+ final Function msgAssertion = msg ->
+ msg.contains("Connection reset by peer") ||
+ msg.contains("Connection reset") ||
+ msg.contains("readAddress(..)") || // https://github.com/reactor/reactor-netty/issues/1673
+ msg.contains("Connection prematurely closed BEFORE response");
+
+ if (expectRetry) {
+ // TODO assert on private retries exhausted exception?
+ final Throwable cause = t.getCause();
+ return cause != null && msgAssertion.apply(cause.getMessage());
+ } else {
+ return msgAssertion.apply(t.getMessage());
+ }
})
.verify(Duration.ofSeconds(30));
int requestCount = 1;
int requestErrorCount = 1;
- if (expectRetry && !(error.get() instanceof PrematureCloseException)) {
+ if (expectRetry && !(error.get().getCause() instanceof PrematureCloseException)) {
requestCount = 2;
requestErrorCount = 2;
}