diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java index 0cb2da1a3a..943f13156c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java @@ -15,6 +15,7 @@ */ package reactor.netty.http.client; +import java.io.IOException; import java.net.SocketAddress; import java.net.URI; import java.time.Duration; @@ -25,6 +26,7 @@ import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import io.netty.buffer.ByteBuf; @@ -49,6 +51,7 @@ import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.NettyOutbound; +import reactor.netty.channel.AbortedException; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.http.Http2SettingsSpec; import reactor.netty.http.HttpProtocol; @@ -60,6 +63,7 @@ import reactor.netty.transport.ClientTransport; import reactor.util.Metrics; import reactor.util.annotation.Nullable; +import reactor.util.retry.RetrySpec; /** * An HttpClient allows building in a safe immutable way an http client that is @@ -634,11 +638,11 @@ public final RequestSender delete() { } /** - * Option to disable {@code retry once} support for the outgoing requests that fail with - * {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}. - *

By default this is set to false in which case {@code retry once} is enabled. + * Option to disable {@code request retry} established by {@link #requestRetry(RetrySpec)}. + * See its doc for any default retry behavior! + *

By default this is set to false. * - * @param disableRetry true to disable {@code retry once}, false to enable it + * @param disableRetry true to disable {@code request retry}, false to enable it * * @return a new {@link HttpClient} * @since 0.9.6 @@ -652,6 +656,21 @@ public final HttpClient disableRetry(boolean disableRetry) { return dup; } + /** + * Option to customize {@code request retry} behavior. If any HTTP request data + * (headers, body, etc.) is sent, the request will not be retried regardless of + * this configuration. This can be disabled via {@link #disableRetry(boolean)}. + * + *

This defaults to {@code retry once} for outgoing requests that fail with + * {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}. + */ + public final HttpClient requestRetry(final RetrySpec requestRetry) { + Objects.requireNonNull(requestRetry, "requestRetry"); + HttpClient dup = duplicate(); + dup.configuration().requestRetrySpec = requestRetry; + return dup; + } + /** * Setup a callback called when {@link HttpClientRequest} has been sent * and {@link HttpClientState#REQUEST_SENT} has been emitted. diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index 062517539d..515996372a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -68,6 +68,7 @@ import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; import reactor.netty.ReactorNetty; +import reactor.netty.channel.AbortedException; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; import reactor.netty.http.Http2SettingsSpec; @@ -83,6 +84,8 @@ import reactor.util.Loggers; import reactor.util.annotation.Nullable; import reactor.util.context.Context; +import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; import static reactor.netty.ReactorNetty.format; import static reactor.netty.http.client.Http2ConnectionProvider.OWNER; @@ -314,7 +317,11 @@ public WebsocketClientSpec websocketClientSpec() { BiConsumer redirectRequestBiConsumer; Consumer redirectRequestConsumer; Duration responseTimeout; + + RetrySpec requestRetrySpec; + boolean retryDisabled; + SslProvider sslProvider; URI uri; String uriStr; @@ -332,7 +339,7 @@ public WebsocketClientSpec websocketClientSpec() { this.method = HttpMethod.GET; this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11}; this._protocols = h11; - this.retryDisabled = false; + this.requestRetrySpec = Retry.max(1).filter(AbortedException::isConnectionReset); } HttpClientConfig(HttpClientConfig parent) { @@ -361,6 +368,7 @@ public WebsocketClientSpec websocketClientSpec() { this.redirectRequestBiConsumer = parent.redirectRequestBiConsumer; this.redirectRequestConsumer = parent.redirectRequestConsumer; this.responseTimeout = parent.responseTimeout; + this.requestRetrySpec = parent.requestRetrySpec; this.retryDisabled = parent.retryDisabled; this.sslProvider = parent.sslProvider; this.uri = parent.uri; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java index 615374ed6a..094f73b875 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java @@ -54,7 +54,6 @@ import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.NettyOutbound; -import reactor.netty.channel.AbortedException; import reactor.netty.http.HttpOperations; import reactor.netty.http.HttpProtocol; import reactor.netty.resources.ConnectionProvider; @@ -67,6 +66,7 @@ import reactor.util.annotation.Nullable; import reactor.util.context.Context; import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; import static reactor.netty.ReactorNetty.format; import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; @@ -208,7 +208,7 @@ static final class MonoHttpConnect extends Mono { public void subscribe(CoreSubscriber actual) { HttpClientHandler handler = new HttpClientHandler(config); - Mono.create(sink -> { + final Mono baseMono = Mono.create(sink -> { HttpClientConfig _config = config; //append secure handler if needed @@ -269,8 +269,48 @@ public void subscribe(CoreSubscriber actual) { .acquire(_config, observer, handler, resolver) .subscribe(new ClientTransportSubscriber(sink)); - }).retryWhen(Retry.indefinitely().filter(handler)) - .subscribe(actual); + }).retryWhen(Retry.indefinitely().filter(err -> { + if (err instanceof RedirectClientException) { + RedirectClientException re = (RedirectClientException)err; + if (HttpResponseStatus.SEE_OTHER.equals(re.status)) { + handler.method = HttpMethod.GET; + } + handler.redirect(re.location); + return true; + } + return false; + })); + + // If request retry is enabled, the handler should guarantee no request data sent + final Mono finalMono = config.retryDisabled ? baseMono : + baseMono.retryWhen(new MyRetry(config.requestRetrySpec.modifyErrorFilter(handler::and), handler)); + + finalMono.subscribe(actual); + } + + // TODO If (and this is a big if) this stays, then do a better job naming. Merging this with + // the handler in some way wouldn't be far off from how a lot of this project is designed.. + static class MyRetry extends Retry { + private final Retry delegate; + private final HttpClientHandler handler; + + MyRetry(final Retry delegate, final HttpClientHandler handler) { + this.delegate = delegate; + this.handler = handler; + } + + @Override + public Publisher generateCompanion(Flux retrySignals) { + return Flux.from(delegate.generateCompanion(retrySignals)) + .map(i -> { + handler.retrying = true; + return i; + }) + .onErrorMap(err -> true, err -> { + handler.retrying = false; + return err; + }); + } } private void removeIncompatibleProtocol(HttpClientConfig config, HttpProtocol protocol) { @@ -347,9 +387,9 @@ public void onUncaughtException(Connection connection, Throwable error) { handler.previousRequestHeaders = ops.requestHeaders; } } - else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) { + else if (handler.requestRetrySpec.errorFilter.test(error)) { HttpClientOperations ops = connection.as(HttpClientOperations.class); - if (ops != null && ops.hasSentHeaders()) { + if (ops != null) { // In some cases the channel close event may be delayed and thus the connection to be // returned to the pool and later the eviction functionality to remote it from the pool. // In some rare cases the connection might be acquired immediately, before the channel close @@ -358,29 +398,15 @@ else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) { // Mark the connection as non-persistent here so that it is never returned to the pool and leave // the channel close event to invalidate it. ops.markPersistent(false); - // Disable retry if the headers or/and the body were sent - handler.shouldRetry = false; - if (log.isWarnEnabled()) { - log.warn(format(connection.channel(), - "The connection observed an error, the request cannot be " + - "retried as the headers/body were sent"), error); - } - } - else { - if (ops != null) { - // In some cases the channel close event may be delayed and thus the connection to be - // returned to the pool and later the eviction functionality to remote it from the pool. - // In some rare cases the connection might be acquired immediately, before the channel close - // event and the eviction functionality be able to remove it from the pool, this may lead to I/O - // errors. - // Mark the connection as non-persistent here so that it is never returned to the pool and leave - // the channel close event to invalidate it. - ops.markPersistent(false); - ops.retrying = true; - } - if (log.isDebugEnabled()) { - log.debug(format(connection.channel(), - "The connection observed an error, the request will be retried"), error); + + if (ops.hasSentHeaders()) { + // Signal to retry that headers or/and the body were sent + handler.requestDataSent = true; + if (log.isWarnEnabled()) { + log.warn(format(connection.channel(), + "The connection observed an error, the request cannot be " + + "retried as the headers/body were sent"), error); + } } } } @@ -397,7 +423,26 @@ else if (error instanceof SslClosedEngineException) { else if (log.isWarnEnabled()) { log.warn(format(connection.channel(), "The connection observed an error"), error); } + // TODO So this is the underpinning that makes the current solution work, and I'm very concerned about it. + // Basically, handler.retrying is set to true/false in a custom Retry#generateCompanion, which is + // ultimately tied to the Mono associated with the sink. I don't know if there are any guarantees + // that lambda will fire synchronously. I could get around this by using a communication channel + // (blocking queue, mono, etc), but that is all kinds of icky and possibly deadlock prone. + // + // Ultimately, the retry state prior to this change was being managed inside these Observers. With + // that being pushed to a customizable retry config, it's not immediately clear how to get the fact + // that a retry is or is not happening back to these observers. My first attempt at solving this + // problem revolved around throwing a custom exception from a failed retry and using that as an indication + // that a retry was not happening. However, that has some negatives as well (and I'm not sure it + // was going to work). + // + // Anyhow, neither feels clean. Still trying to think of another way, but with the 30 minute increments + // I've been giving to this problem, I haven't come up with something that feels clean:( sink.error(error); + HttpClientOperations ops = connection.as(HttpClientOperations.class); + if (ops != null) { + ops.retrying = handler.retrying; + } } @Override @@ -468,10 +513,21 @@ static final class HttpClientHandler extends SocketAddress volatile String resourceUrl; volatile UriEndpoint fromURI; volatile Supplier[] redirectedFrom; - volatile boolean shouldRetry; + + /** + * A {@link RetrySpec} that is tied to request submission. The implementation + * that leverages this is supposed to guarantee that no retry will happen if + * any HTTP request data is sent over the wire. + */ + final RetrySpec requestRetrySpec; + + volatile boolean requestDataSent; + + volatile boolean retrying; + volatile HttpHeaders previousRequestHeaders; - HttpClientHandler(HttpClientConfig configuration) { + HttpClientHandler(final HttpClientConfig configuration) { this.method = configuration.method; this.compress = configuration.acceptGzip; this.followRedirectPredicate = configuration.followRedirectPredicate; @@ -488,7 +544,7 @@ static final class HttpClientHandler extends SocketAddress new UriEndpointFactory(configuration.remoteAddress(), configuration.isSecure(), URI_ADDRESS_MAPPER); this.websocketClientSpec = configuration.websocketClientSpec; - this.shouldRetry = !configuration.retryDisabled; + this.requestRetrySpec = configuration.requestRetrySpec; this.handler = configuration.body; if (configuration.uri == null) { @@ -674,16 +730,7 @@ void channel(HttpClientOperations ops) { @Override public boolean test(Throwable throwable) { - if (throwable instanceof RedirectClientException) { - RedirectClientException re = (RedirectClientException) throwable; - if (HttpResponseStatus.SEE_OTHER.equals(re.status)) { - method = HttpMethod.GET; - } - redirect(re.location); - return true; - } - if (shouldRetry && AbortedException.isConnectionReset(throwable)) { - shouldRetry = false; + if (!requestDataSent) { redirect(toURI.toString()); return true; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java index 9577c82f21..87fea0bea9 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java @@ -124,7 +124,6 @@ class HttpClientOperations extends HttpOperations HttpClientOperations(HttpClientOperations replaced) { super(replaced); this.started = replaced.started; - this.retrying = replaced.retrying; this.redirecting = replaced.redirecting; this.redirectedFrom = replaced.redirectedFrom; this.redirectRequestConsumer = replaced.redirectRequestConsumer; diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index b3e4f66b80..23ce376fb1 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -1323,18 +1323,30 @@ private void doTestRetry(boolean retryDisabled, boolean expectRetry) throws Exce }) .responseContent()) .expectErrorMatches(t -> { - error.set(t); - return t.getMessage() != null && - (t.getMessage().contains("Connection reset by peer") || - t.getMessage().contains("Connection reset") || - t.getMessage().contains("readAddress(..)") || // https://github.com/reactor/reactor-netty/issues/1673 - t.getMessage().contains("Connection prematurely closed BEFORE response")); + error.set(t); + // TODO Is it considered a contract change to return the retry exhausted wrapper? + // I'm thinking it is, but it does seem nice to indicate a retry happened and failed + // Will need reactor-netty folks to decide on this. If I hear nothing from them + // before moving out of draft, change things to preserve original behavior. + final Function msgAssertion = msg -> + msg.contains("Connection reset by peer") || + msg.contains("Connection reset") || + msg.contains("readAddress(..)") || // https://github.com/reactor/reactor-netty/issues/1673 + msg.contains("Connection prematurely closed BEFORE response"); + + if (expectRetry) { + // TODO assert on private retries exhausted exception? + final Throwable cause = t.getCause(); + return cause != null && msgAssertion.apply(cause.getMessage()); + } else { + return msgAssertion.apply(t.getMessage()); + } }) .verify(Duration.ofSeconds(30)); int requestCount = 1; int requestErrorCount = 1; - if (expectRetry && !(error.get() instanceof PrematureCloseException)) { + if (expectRetry && !(error.get().getCause() instanceof PrematureCloseException)) { requestCount = 2; requestErrorCount = 2; }