diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index a5fa0f80077..9e9e0359804 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -768,6 +768,19 @@ public void operationComplete(ChannelFuture future) throws Exception { } } }); + // When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in + // StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS + // are delayed because the OS may have too much buffered and isn't accepting the write. The + // write promise is also delayed until flush(). However, we need to associate the netty stream + // with the transport state so that goingAway() and forcefulClose() and able to notify the + // stream of failures. + // + // This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but + // it is better than nothing. + Http2Stream http2Stream = connection().stream(streamId); + if (http2Stream != null) { + http2Stream.setProperty(streamKey, stream); + } } /** diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index f8fbeea9b82..ee458e177ac 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -453,6 +453,26 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio assertTrue(future.isDone()); } + @Test + public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception { + // Purposefully avoid flush(), since we want the write to not actually complete. + // EmbeddedChannel doesn't support flow control, so this is the next closest approximation. + ChannelFuture future = channel().write( + newCreateStreamCommand(grpcHeaders, streamTransportState)); + // Read a GOAWAY that indicates our stream can't be sent + channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8))); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); + verify(streamListener).closed(captor.capture(), same(REFUSED), + ArgumentMatchers.notNull()); + assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); + assertEquals( + "Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, " + + "debug data: this is a test", + captor.getValue().getDescription()); + assertTrue(future.isDone()); + } + @Test public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams() throws Exception {