Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable request retry #2453

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.http.client;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
Expand All @@ -25,6 +26,7 @@
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import io.netty.buffer.ByteBuf;
Expand All @@ -49,6 +51,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.Http2SettingsSpec;
import reactor.netty.http.HttpProtocol;
Expand All @@ -60,6 +63,7 @@
import reactor.netty.transport.ClientTransport;
import reactor.util.Metrics;
import reactor.util.annotation.Nullable;
import reactor.util.retry.RetrySpec;

/**
* An HttpClient allows building in a safe immutable way an http client that is
Expand Down Expand Up @@ -634,11 +638,11 @@ public final RequestSender delete() {
}

/**
* Option to disable {@code retry once} support for the outgoing requests that fail with
* {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}.
* <p>By default this is set to false in which case {@code retry once} is enabled.
* Option to disable {@code request retry} established by {@link #requestRetry(RetrySpec)}.
* See its doc for any default retry behavior!
* <p>By default this is set to false.
*
* @param disableRetry true to disable {@code retry once}, false to enable it
* @param disableRetry true to disable {@code request retry}, false to enable it
*
* @return a new {@link HttpClient}
* @since 0.9.6
Expand All @@ -652,6 +656,21 @@ public final HttpClient disableRetry(boolean disableRetry) {
return dup;
}

/**
* Option to customize {@code request retry} behavior. If any HTTP request data
* (headers, body, etc.) is sent, the request will not be retried regardless of
* this configuration. This can be disabled via {@link #disableRetry(boolean)}.
*
* <p>This defaults to {@code retry once} for outgoing requests that fail with
* {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}.
*/
public final HttpClient requestRetry(final RetrySpec requestRetry) {
Objects.requireNonNull(requestRetry, "requestRetry");
HttpClient dup = duplicate();
dup.configuration().requestRetrySpec = requestRetry;
return dup;
}

/**
* Setup a callback called when {@link HttpClientRequest} has been sent
* and {@link HttpClientState#REQUEST_SENT} has been emitted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.Http2SettingsSpec;
Expand All @@ -83,6 +84,8 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.Http2ConnectionProvider.OWNER;
Expand Down Expand Up @@ -314,7 +317,11 @@ public WebsocketClientSpec websocketClientSpec() {
BiConsumer<HttpHeaders, HttpClientRequest> redirectRequestBiConsumer;
Consumer<HttpClientRequest> redirectRequestConsumer;
Duration responseTimeout;

RetrySpec requestRetrySpec;

boolean retryDisabled;

SslProvider sslProvider;
URI uri;
String uriStr;
Expand All @@ -332,7 +339,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.method = HttpMethod.GET;
this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11};
this._protocols = h11;
this.retryDisabled = false;
this.requestRetrySpec = Retry.max(1).filter(AbortedException::isConnectionReset);
}

HttpClientConfig(HttpClientConfig parent) {
Expand Down Expand Up @@ -361,6 +368,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.redirectRequestBiConsumer = parent.redirectRequestBiConsumer;
this.redirectRequestConsumer = parent.redirectRequestConsumer;
this.responseTimeout = parent.responseTimeout;
this.requestRetrySpec = parent.requestRetrySpec;
this.retryDisabled = parent.retryDisabled;
this.sslProvider = parent.sslProvider;
this.uri = parent.uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.AbortedException;
import reactor.netty.http.HttpOperations;
import reactor.netty.http.HttpProtocol;
import reactor.netty.resources.ConnectionProvider;
Expand All @@ -67,6 +66,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED;
Expand Down Expand Up @@ -208,7 +208,7 @@ static final class MonoHttpConnect extends Mono<Connection> {
public void subscribe(CoreSubscriber<? super Connection> actual) {
HttpClientHandler handler = new HttpClientHandler(config);

Mono.<Connection>create(sink -> {
final Mono<Connection> baseMono = Mono.<Connection>create(sink -> {
HttpClientConfig _config = config;

//append secure handler if needed
Expand Down Expand Up @@ -269,8 +269,48 @@ public void subscribe(CoreSubscriber<? super Connection> actual) {
.acquire(_config, observer, handler, resolver)
.subscribe(new ClientTransportSubscriber(sink));

}).retryWhen(Retry.indefinitely().filter(handler))
.subscribe(actual);
}).retryWhen(Retry.indefinitely().filter(err -> {
if (err instanceof RedirectClientException) {
RedirectClientException re = (RedirectClientException)err;
if (HttpResponseStatus.SEE_OTHER.equals(re.status)) {
handler.method = HttpMethod.GET;
}
handler.redirect(re.location);
return true;
}
return false;
}));

// If request retry is enabled, the handler should guarantee no request data sent
final Mono<Connection> finalMono = config.retryDisabled ? baseMono :
baseMono.retryWhen(new MyRetry(config.requestRetrySpec.modifyErrorFilter(handler::and), handler));

finalMono.subscribe(actual);
}

// TODO If (and this is a big if) this stays, then do a better job naming. Merging this with
// the handler in some way wouldn't be far off from how a lot of this project is designed..
static class MyRetry extends Retry {
private final Retry delegate;
private final HttpClientHandler handler;

MyRetry(final Retry delegate, final HttpClientHandler handler) {
this.delegate = delegate;
this.handler = handler;
}

@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
return Flux.from(delegate.generateCompanion(retrySignals))
.map(i -> {
handler.retrying = true;
return i;
})
.onErrorMap(err -> true, err -> {
handler.retrying = false;
return err;
});
}
}

private void removeIncompatibleProtocol(HttpClientConfig config, HttpProtocol protocol) {
Expand Down Expand Up @@ -347,9 +387,9 @@ public void onUncaughtException(Connection connection, Throwable error) {
handler.previousRequestHeaders = ops.requestHeaders;
}
}
else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) {
else if (handler.requestRetrySpec.errorFilter.test(error)) {
HttpClientOperations ops = connection.as(HttpClientOperations.class);
if (ops != null && ops.hasSentHeaders()) {
if (ops != null) {
// In some cases the channel close event may be delayed and thus the connection to be
// returned to the pool and later the eviction functionality to remote it from the pool.
// In some rare cases the connection might be acquired immediately, before the channel close
Expand All @@ -358,29 +398,15 @@ else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) {
// Mark the connection as non-persistent here so that it is never returned to the pool and leave
// the channel close event to invalidate it.
ops.markPersistent(false);
// Disable retry if the headers or/and the body were sent
handler.shouldRetry = false;
if (log.isWarnEnabled()) {
log.warn(format(connection.channel(),
"The connection observed an error, the request cannot be " +
"retried as the headers/body were sent"), error);
}
}
else {
if (ops != null) {
// In some cases the channel close event may be delayed and thus the connection to be
// returned to the pool and later the eviction functionality to remote it from the pool.
// In some rare cases the connection might be acquired immediately, before the channel close
// event and the eviction functionality be able to remove it from the pool, this may lead to I/O
// errors.
// Mark the connection as non-persistent here so that it is never returned to the pool and leave
// the channel close event to invalidate it.
ops.markPersistent(false);
ops.retrying = true;
}
if (log.isDebugEnabled()) {
log.debug(format(connection.channel(),
"The connection observed an error, the request will be retried"), error);

if (ops.hasSentHeaders()) {
// Signal to retry that headers or/and the body were sent
handler.requestDataSent = true;
if (log.isWarnEnabled()) {
log.warn(format(connection.channel(),
"The connection observed an error, the request cannot be " +
"retried as the headers/body were sent"), error);
}
}
}
}
Expand All @@ -397,7 +423,26 @@ else if (error instanceof SslClosedEngineException) {
else if (log.isWarnEnabled()) {
log.warn(format(connection.channel(), "The connection observed an error"), error);
}
// TODO So this is the underpinning that makes the current solution work, and I'm very concerned about it.
// Basically, handler.retrying is set to true/false in a custom Retry#generateCompanion, which is
// ultimately tied to the Mono associated with the sink. I don't know if there are any guarantees
// that lambda will fire synchronously. I could get around this by using a communication channel
// (blocking queue, mono, etc), but that is all kinds of icky and possibly deadlock prone.
//
// Ultimately, the retry state prior to this change was being managed inside these Observers. With
// that being pushed to a customizable retry config, it's not immediately clear how to get the fact
// that a retry is or is not happening back to these observers. My first attempt at solving this
// problem revolved around throwing a custom exception from a failed retry and using that as an indication
// that a retry was not happening. However, that has some negatives as well (and I'm not sure it
// was going to work).
//
// Anyhow, neither feels clean. Still trying to think of another way, but with the 30 minute increments
// I've been giving to this problem, I haven't come up with something that feels clean:(
sink.error(error);
HttpClientOperations ops = connection.as(HttpClientOperations.class);
if (ops != null) {
ops.retrying = handler.retrying;
}
}

@Override
Expand Down Expand Up @@ -468,10 +513,21 @@ static final class HttpClientHandler extends SocketAddress
volatile String resourceUrl;
volatile UriEndpoint fromURI;
volatile Supplier<String>[] redirectedFrom;
volatile boolean shouldRetry;

/**
* A {@link RetrySpec} that is tied to request submission. The implementation
* that leverages this is supposed to guarantee that no retry will happen if
* any HTTP request data is sent over the wire.
*/
final RetrySpec requestRetrySpec;

volatile boolean requestDataSent;

volatile boolean retrying;

volatile HttpHeaders previousRequestHeaders;

HttpClientHandler(HttpClientConfig configuration) {
HttpClientHandler(final HttpClientConfig configuration) {
this.method = configuration.method;
this.compress = configuration.acceptGzip;
this.followRedirectPredicate = configuration.followRedirectPredicate;
Expand All @@ -488,7 +544,7 @@ static final class HttpClientHandler extends SocketAddress
new UriEndpointFactory(configuration.remoteAddress(), configuration.isSecure(), URI_ADDRESS_MAPPER);

this.websocketClientSpec = configuration.websocketClientSpec;
this.shouldRetry = !configuration.retryDisabled;
this.requestRetrySpec = configuration.requestRetrySpec;
this.handler = configuration.body;

if (configuration.uri == null) {
Expand Down Expand Up @@ -674,16 +730,7 @@ void channel(HttpClientOperations ops) {

@Override
public boolean test(Throwable throwable) {
if (throwable instanceof RedirectClientException) {
RedirectClientException re = (RedirectClientException) throwable;
if (HttpResponseStatus.SEE_OTHER.equals(re.status)) {
method = HttpMethod.GET;
}
redirect(re.location);
return true;
}
if (shouldRetry && AbortedException.isConnectionReset(throwable)) {
shouldRetry = false;
if (!requestDataSent) {
redirect(toURI.toString());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ class HttpClientOperations extends HttpOperations<NettyInbound, NettyOutbound>
HttpClientOperations(HttpClientOperations replaced) {
super(replaced);
this.started = replaced.started;
this.retrying = replaced.retrying;
this.redirecting = replaced.redirecting;
this.redirectedFrom = replaced.redirectedFrom;
this.redirectRequestConsumer = replaced.redirectRequestConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,18 +1323,30 @@ private void doTestRetry(boolean retryDisabled, boolean expectRetry) throws Exce
})
.responseContent())
.expectErrorMatches(t -> {
error.set(t);
return t.getMessage() != null &&
(t.getMessage().contains("Connection reset by peer") ||
t.getMessage().contains("Connection reset") ||
t.getMessage().contains("readAddress(..)") || // https://github.com/reactor/reactor-netty/issues/1673
t.getMessage().contains("Connection prematurely closed BEFORE response"));
error.set(t);
// TODO Is it considered a contract change to return the retry exhausted wrapper?
// I'm thinking it is, but it does seem nice to indicate a retry happened and failed
// Will need reactor-netty folks to decide on this. If I hear nothing from them
// before moving out of draft, change things to preserve original behavior.
final Function<String, Boolean> msgAssertion = msg ->
msg.contains("Connection reset by peer") ||
msg.contains("Connection reset") ||
msg.contains("readAddress(..)") || // https://github.com/reactor/reactor-netty/issues/1673
msg.contains("Connection prematurely closed BEFORE response");

if (expectRetry) {
// TODO assert on private retries exhausted exception?
final Throwable cause = t.getCause();
return cause != null && msgAssertion.apply(cause.getMessage());
} else {
return msgAssertion.apply(t.getMessage());
}
})
.verify(Duration.ofSeconds(30));

int requestCount = 1;
int requestErrorCount = 1;
if (expectRetry && !(error.get() instanceof PrematureCloseException)) {
if (expectRetry && !(error.get().getCause() instanceof PrematureCloseException)) {
requestCount = 2;
requestErrorCount = 2;
}
Expand Down