Skip to content

Commit

Permalink
Allow sending Connection: close header (#4531)
Browse files Browse the repository at this point in the history
* Allow sending `Connection: close` header

Motivation:

This PR aims to solve two problems with `Connection` header.

1. Allow sending `Connection` header field.

  HTTP/2 does not use the `Connection` header field.
  https://httpwg.org/specs/rfc9113.html#rfc.section.8.2.2
  Hence, `Connection` header is prohibited and it is automatically
  stripped from headers.
  https://github.com/line/armeria/blob/b127cd27252f6a454130f66d1175a06faed01f09/core/src/main/java/com/linecorp/armeria/client/ClientOptions.java#L132-L132

  There have been requets to close a connection after sending requests or
  responses on purpose or send `Connection: close` header for
  compatibility with legacy HTTP/1 servers.

  For HTTP/2, we can translate `Connection: close` as a signal to close
  a connection by sending a GOAWAY frame.
  It would be useful to rebalance HTTP/2 loads by closing
  connections after some periods.

2. Armeria server does not return `Connection:close` header when
   `Connection: close` is received.

  As a client sent `Connection: close` header, it seems fine not to
  return `Connection: close` header. However, returning the
  `Connection: close` header would be more compliant with HTTP/1.1
  protocol.

Modifications:

- Removed `CONNECTION` header from prohibited header names.
- Added `KeepAliveHandler.disconnectWhenFinished()` which is called when
  a `Connection: close` is specified in response headers.
- Modified `HttpServerHandler` to check whether to close a connection
  when responses have been written.
- Removed the singleton instance of `NoopKeepAliveHandler` and make to
  create an instance for each connection.
  - `NoopKeepAliveHandler` now has two fielders to know whether
    a channel needs immediate disconnection or the connection has to be
    closed after receving all responses in process.
- Fixed `HttpChannelPool` to determine if a session is healthy using
  `isActive()`. This change allows inflights requests to use
  HTTP/2 sessions before GOAWAY is sent or received.
- Fixed to correctly set keep alive headers for HTTP/1.1.

Result:

- You can now send `Connection: close` to close a connection after
  receiving a response.
- Fixes #4471
- Fixes #4454
- Fixes #4131

* Update core/src/test/java/com/linecorp/armeria/client/CountingConnectionPoolListener.java

Co-authored-by: jrhee17 <[email protected]>

* Rename isActive() into canAcquire() and invoked deactivate() whenever disconnectWhenFinish() is called

* Fix the flaky test

* Address comments by @jrhee17

* Disallow acquring when a connection is about to disconnect

* Allow only "connection: close" header

* Add comments for connection shutdown mode

* Address comments by @trustin

* Add assert

* Address comments by @jrhee17

* Fix recusive calls

* Remove cruft

---------

Co-authored-by: jrhee17 <[email protected]>
  • Loading branch information
ikhoon and jrhee17 committed Mar 8, 2023
1 parent 5854658 commit 20af195
Show file tree
Hide file tree
Showing 37 changed files with 783 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.armeria.client;

import static com.linecorp.armeria.client.HttpSessionHandler.MAX_NUM_REQUESTS_SENT;
import static com.linecorp.armeria.internal.common.HttpHeadersUtil.CLOSE_STRING;
import static com.linecorp.armeria.internal.common.HttpHeadersUtil.mergeRequestHeaders;

import java.util.concurrent.ScheduledFuture;
Expand All @@ -28,6 +29,7 @@
import com.linecorp.armeria.client.HttpResponseDecoder.HttpResponseWrapper;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpObject;
import com.linecorp.armeria.common.RequestHeaders;
Expand Down Expand Up @@ -155,10 +157,9 @@ final boolean tryInitialize() {
" in one connection. ID: " + id);
} else {
exception = new ClosedSessionException(
"Can't send requests. ID: " + id + ", session active: " + session.isActive() +
", response needs to disconnect: " + responseDecoder.needsToDisconnectWhenFinished());
"Can't send requests. ID: " + id + ", session active: " + session.isAcquirable());
}
responseDecoder.disconnectWhenFinished();
session.deactivate();
// No need to send RST because we didn't send any packet and this will be disconnected anyway.
fail(UnprocessedRequestException.of(exception));
return false;
Expand Down Expand Up @@ -202,6 +203,17 @@ final void writeHeaders(RequestHeaders headers) {
final RequestHeaders merged = mergeRequestHeaders(
headers, ctx.defaultRequestHeaders(), ctx.additionalRequestHeaders());
logBuilder.requestHeaders(merged);

final String connectionOption = headers.get(HttpHeaderNames.CONNECTION);
if (CLOSE_STRING.equalsIgnoreCase(connectionOption)) {
// Make the session unhealthy so that subsequent requests do not use it.
// In HTTP/2 request, the "Connection: close" is just interpreted as a signal to close the
// connection by sending a GOAWAY frame that will be sent after receiving the corresponding
// response from the remote peer. The "Connection: close" header is stripped when it is converted to
// a Netty HTTP/2 header.
session.deactivate();
}

final ChannelPromise promise = ch.newPromise();
// Attach a listener first to make the listener early handle a cause raised while writing headers
// before any other callbacks like `onStreamClosed()` are invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.linecorp.armeria.internal.common.ArmeriaHttpUtil;
import com.linecorp.armeria.internal.common.Http1ObjectEncoder;
import com.linecorp.armeria.internal.common.KeepAliveHandler;
import com.linecorp.armeria.internal.common.NoopKeepAliveHandler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -42,13 +41,13 @@
final class ClientHttp1ObjectEncoder extends Http1ObjectEncoder implements ClientHttpObjectEncoder {

private final Http1HeaderNaming http1HeaderNaming;
private final KeepAliveHandler keepAliveHandler;

// A proper keepAliveHandler will be set by setKeepAliveHandler()
private KeepAliveHandler keepAliveHandler = NoopKeepAliveHandler.INSTANCE;

ClientHttp1ObjectEncoder(Channel ch, SessionProtocol protocol, Http1HeaderNaming http1HeaderNaming) {
ClientHttp1ObjectEncoder(Channel ch, SessionProtocol protocol, Http1HeaderNaming http1HeaderNaming,
KeepAliveHandler keepAliveHandler) {
super(ch, protocol);
this.http1HeaderNaming = http1HeaderNaming;
this.keepAliveHandler = keepAliveHandler;
}

@Override
Expand Down Expand Up @@ -114,13 +113,9 @@ public KeepAliveHandler keepAliveHandler() {
return keepAliveHandler;
}

void setKeepAliveHandler(KeepAliveHandler keepAliveHandler) {
assert keepAliveHandler instanceof Http1ClientKeepAliveHandler;
this.keepAliveHandler = keepAliveHandler;
}

@Override
protected boolean isPing(int id) {
final KeepAliveHandler keepAliveHandler = keepAliveHandler();
return keepAliveHandler instanceof Http1ClientKeepAliveHandler &&
((Http1ClientKeepAliveHandler) keepAliveHandler).isPing(id);
}
Expand Down
23 changes: 21 additions & 2 deletions core/src/main/java/com/linecorp/armeria/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.client;

import static com.linecorp.armeria.internal.common.HttpHeadersUtil.CLOSE_STRING;
import static java.util.Objects.requireNonNull;

import java.util.Arrays;
Expand Down Expand Up @@ -129,7 +130,6 @@ public final class ClientOptions
ClientOption.define("ENDPOINT_REMAPPER", Function.identity());

private static final List<AsciiString> PROHIBITED_HEADER_NAMES = ImmutableList.of(
HttpHeaderNames.CONNECTION,
HttpHeaderNames.HTTP2_SETTINGS,
HttpHeaderNames.METHOD,
HttpHeaderNames.PATH,
Expand All @@ -155,7 +155,26 @@ public final class ClientOptions
throw new IllegalArgumentException("prohibited header name: " + name);
}
}
return newHeaders;

boolean hasUnnormalizedCloseValue = false;
for (String connectionOption : newHeaders.getAll(HttpHeaderNames.CONNECTION)) {
// - Disallow connection headers apart from "Connection: close".
// - Connection options are case-insensitive.
if ("close".equalsIgnoreCase(connectionOption)) {
if (!"close".equals(connectionOption)) {
hasUnnormalizedCloseValue = true;
}
} else {
throw new IllegalArgumentException(
"prohibited 'Connection' header value: " + connectionOption);
}
}

if (hasUnnormalizedCloseValue) {
return newHeaders.toBuilder().set(HttpHeaderNames.CONNECTION, CLOSE_STRING).build();
} else {
return newHeaders;
}
}, (oldValue, newValue) -> {
final HttpHeaders newHeaders = newValue.value();
if (newHeaders.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.internal.client.UserAgentUtil;
import com.linecorp.armeria.internal.common.Http1KeepAliveHandler;

Expand All @@ -37,23 +38,28 @@ final class Http1ClientKeepAliveHandler extends Http1KeepAliveHandler {
.build();

private final HttpSession httpSession;
private final ClientHttp1ObjectEncoder encoder;
private final Http1ResponseDecoder decoder;
@Nullable
private ClientHttp1ObjectEncoder encoder;

Http1ClientKeepAliveHandler(Channel channel, ClientHttp1ObjectEncoder encoder, Http1ResponseDecoder decoder,
Http1ClientKeepAliveHandler(Channel channel, Http1ResponseDecoder decoder,
Timer keepAliveTimer, long idleTimeoutMillis, long pingIntervalMillis,
long maxConnectionAgeMillis, int maxNumRequestsPerConnection) {
super(channel, "client", keepAliveTimer, idleTimeoutMillis,
pingIntervalMillis, maxConnectionAgeMillis, maxNumRequestsPerConnection);
httpSession = HttpSession.get(requireNonNull(channel, "channel"));
this.encoder = requireNonNull(encoder, "encoder");
this.decoder = requireNonNull(decoder, "decoder");
}

void setEncoder(ClientHttp1ObjectEncoder encoder) {
this.encoder = requireNonNull(encoder, "encoder");
}

@Override
protected ChannelFuture writePing(ChannelHandlerContext ctx) {
final int id = httpSession.incrementAndGetNumRequestsSent();

assert encoder != null;
decoder.setPingReqId(id);
final ChannelFuture future = encoder.writeHeaders(id, 0, HTTP1_PING_REQUEST, true, ctx.newPromise());
ctx.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.linecorp.armeria.internal.common.ArmeriaHttpUtil;
import com.linecorp.armeria.internal.common.InboundTrafficController;
import com.linecorp.armeria.internal.common.KeepAliveHandler;
import com.linecorp.armeria.internal.common.NoopKeepAliveHandler;
import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -61,7 +60,8 @@ private enum State {
/** The response being decoded currently. */
@Nullable
private HttpResponseWrapper res;
private KeepAliveHandler keepAliveHandler = NoopKeepAliveHandler.INSTANCE;
@Nullable
private KeepAliveHandler keepAliveHandler;
private int resId = 1;
private int lastPingReqId = -1;
private State state = State.NEED_HEADERS;
Expand Down Expand Up @@ -162,7 +162,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

if (!HttpUtil.isKeepAlive(nettyRes)) {
disconnectWhenFinished();
session().deactivate();
}

final HttpResponseWrapper res = getResponse(resId);
Expand Down Expand Up @@ -320,25 +320,35 @@ KeepAliveHandler keepAliveHandler() {
}

void setKeepAliveHandler(ChannelHandlerContext ctx, KeepAliveHandler keepAliveHandler) {
assert keepAliveHandler instanceof Http1ClientKeepAliveHandler;
this.keepAliveHandler = keepAliveHandler;
maybeInitializeKeepAliveHandler(ctx);
if (keepAliveHandler instanceof Http1ClientKeepAliveHandler) {
maybeInitializeKeepAliveHandler(ctx);
}
}

private void maybeInitializeKeepAliveHandler(ChannelHandlerContext ctx) {
if (ctx.channel().isActive()) {
keepAliveHandler.initialize(ctx);
final KeepAliveHandler keepAliveHandler = keepAliveHandler();
if (keepAliveHandler != null) {
keepAliveHandler.initialize(ctx);
}
}
}

private void destroyKeepAliveHandler() {
keepAliveHandler.destroy();
final KeepAliveHandler keepAliveHandler = keepAliveHandler();
if (keepAliveHandler != null) {
keepAliveHandler.destroy();
}
}

private void onPingRead(Object msg) {
if (msg instanceof HttpResponse) {
assert keepAliveHandler != NoopKeepAliveHandler.INSTANCE;
keepAliveHandler.onPing();
final KeepAliveHandler keepAliveHandler = keepAliveHandler();
// Ping can not be activated with NoopKeepAliveHandler.
if (keepAliveHandler instanceof Http1ClientKeepAliveHandler) {
keepAliveHandler.onPing();
}
}
if (msg instanceof LastHttpContent) {
onPingComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private static KeepAliveHandler newKeepAliveHandler(
idleTimeoutMillis, pingIntervalMillis, maxConnectionAgeMillis, maxNumRequestsPerConnection);

if (!needsKeepAliveHandler) {
return NoopKeepAliveHandler.INSTANCE;
return new NoopKeepAliveHandler();
}

final Timer keepAliveTimer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;

import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -163,15 +165,14 @@ public void onStreamRemoved(Http2Stream stream) {}

@Override
public void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
disconnectWhenFinished();
session().deactivate();
goAwayHandler.onGoAwaySent(channel(), lastStreamId, errorCode, debugData);
}

@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
// Should not reuse a connection that received a GOAWAY frame.
HttpSession.get(channel()).deactivate();
disconnectWhenFinished();
session().deactivate();
goAwayHandler.onGoAwayReceived(channel(), lastStreamId, errorCode, debugData);
}

Expand Down Expand Up @@ -347,15 +348,14 @@ public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int wind
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload) {}

@Nonnull
@Override
KeepAliveHandler keepAliveHandler() {
return keepAliveHandler;
}

private void keepAliveChannelRead() {
if (keepAliveHandler != null) {
keepAliveHandler.onReadOrWrite();
}
keepAliveHandler.onReadOrWrite();
}

private static int streamIdToId(int streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private PooledChannel acquireNowExact(PoolKey key, SessionProtocol protocol) {

private static boolean isHealthy(PooledChannel pooledChannel) {
final Channel ch = pooledChannel.get();
return ch.isActive() && HttpSession.get(ch).canSendRequest();
return ch.isActive() && HttpSession.get(ch).isAcquirable();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ abstract class HttpResponseDecoder {
private final Channel channel;
private final InboundTrafficController inboundTrafficController;

@Nullable
private HttpSession httpSession;

private int unfinishedResponses;
private boolean disconnectWhenFinished;
private boolean closing;

HttpResponseDecoder(Channel channel, InboundTrafficController inboundTrafficController) {
Expand All @@ -84,7 +86,10 @@ HttpResponseWrapper addResponse(
new HttpResponseWrapper(res, ctx, responseTimeoutMillis, maxContentLength);
final HttpResponseWrapper oldRes = responses.put(id, newRes);

keepAliveHandler().increaseNumRequests();
final KeepAliveHandler keepAliveHandler = keepAliveHandler();
if (keepAliveHandler != null) {
keepAliveHandler.increaseNumRequests();
}

assert oldRes == null : "addResponse(" + id + ", " + res + ", " + responseTimeoutMillis + "): " +
oldRes;
Expand Down Expand Up @@ -145,18 +150,18 @@ final void failUnfinishedResponses(Throwable cause) {
}
}

abstract KeepAliveHandler keepAliveHandler();

final void disconnectWhenFinished() {
disconnectWhenFinished = true;
HttpSession session() {
if (httpSession != null) {
return httpSession;
}
return httpSession = HttpSession.get(channel);
}

final boolean needsToDisconnectNow() {
return needsToDisconnectWhenFinished() && !hasUnfinishedResponses();
}
@Nullable
abstract KeepAliveHandler keepAliveHandler();

final boolean needsToDisconnectWhenFinished() {
return disconnectWhenFinished || keepAliveHandler().needToCloseConnection();
final boolean needsToDisconnectNow() {
return !session().isAcquirable() && !hasUnfinishedResponses();
}

static final class HttpResponseWrapper implements StreamWriter<HttpObject> {
Expand Down
Loading

0 comments on commit 20af195

Please sign in to comment.