Skip to content

netty: Associate netty stream eagerly to avoid client hang #12222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,19 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}
});
// When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
// StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
// are delayed because the OS may have too much buffered and isn't accepting the write. The
// write promise is also delayed until flush(). However, we need to associate the netty stream
// with the transport state so that goingAway() and forcefulClose() and able to notify the
// stream of failures.
//
// This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
// it is better than nothing.
Http2Stream http2Stream = connection().stream(streamId);
if (http2Stream != null) {
http2Stream.setProperty(streamKey, stream);
}
}

/**
Expand Down
20 changes: 20 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,26 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio
assertTrue(future.isDone());
}

@Test
public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception {
// Purposefully avoid flush(), since we want the write to not actually complete.
// EmbeddedChannel doesn't support flow control, so this is the next closest approximation.
ChannelFuture future = channel().write(
newCreateStreamCommand(grpcHeaders, streamTransportState));
// Read a GOAWAY that indicates our stream can't be sent
channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8)));

ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture(), same(REFUSED),
ArgumentMatchers.<Metadata>notNull());
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
assertEquals(
"Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, "
+ "debug data: this is a test",
captor.getValue().getDescription());
assertTrue(future.isDone());
}

@Test
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
throws Exception {
Expand Down