From 6f8ad54d3f41dc03a3688018d7cd111bc3587d1e Mon Sep 17 00:00:00 2001 From: Nugraha Date: Fri, 22 Aug 2025 03:53:49 +0700 Subject: [PATCH 1/6] chore: wip 1 Signed-off-by: Nugraha --- duplex_http_call.go | 25 ++++++++++++++----------- features.go | 5 +++++ handler.go | 10 ++++++++++ protocol_grpc.go | 8 +++++--- 4 files changed, 34 insertions(+), 14 deletions(-) create mode 100644 features.go diff --git a/duplex_http_call.go b/duplex_http_call.go index 768a5bbb..4ee2bcf2 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -326,18 +326,21 @@ func (d *duplexHTTPCall) makeRequest() { _ = d.CloseWrite() return } - if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 { - // If we somehow dialed an HTTP/1.x server, fail with an explicit message - // rather than returning a more cryptic error later on. - d.responseErr = errorf( - CodeUnimplemented, - "response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2", - d.request.URL, - response.ProtoMajor, - response.ProtoMinor, - ) - _ = d.CloseWrite() + if DisallowBidiStreamingHttp11 { + if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 { + // If we somehow dialed an HTTP/1.x server, fail with an explicit message + // rather than returning a more cryptic error later on. + d.responseErr = errorf( + CodeUnimplemented, + "response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2", + d.request.URL, + response.ProtoMajor, + response.ProtoMinor, + ) + _ = d.CloseWrite() + } } + } // getNoBody is a GetBody function for http.NoBody. diff --git a/features.go b/features.go new file mode 100644 index 00000000..a6132edc --- /dev/null +++ b/features.go @@ -0,0 +1,5 @@ +package connect + +var ( + DisallowBidiStreamingHttp11 = true +) diff --git a/handler.go b/handler.go index 9f95627b..e55489f6 100644 --- a/handler.go +++ b/handler.go @@ -163,12 +163,22 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re // okay if we can't re-use the connection. isBidi := (h.spec.StreamType & StreamTypeBidi) == StreamTypeBidi if isBidi && request.ProtoMajor < 2 { + // Check if we allow bidi stream over HTTP/1.1, enable full-duplex support + // and if fail, we fallback to the default behaviour. + if !DisallowBidiStreamingHttp11 { + responseController := http.NewResponseController(responseWriter) + if err := responseController.EnableFullDuplex(); err == nil { + goto Pass + } + } + // Clients coded to expect full-duplex connections may hang if they've // mistakenly negotiated HTTP/1.1. To unblock them, we must close the // underlying TCP connection. responseWriter.Header().Set("Connection", "close") responseWriter.WriteHeader(http.StatusHTTPVersionNotSupported) return + Pass: } protocolHandlers := h.protocolHandlers[request.Method] diff --git a/protocol_grpc.go b/protocol_grpc.go index e10ecad7..3205513f 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -515,9 +515,11 @@ func (hc *grpcHandlerConn) Close(err error) (retErr error) { // a well-intentioned client may just not expect the server to be returning // an error for a streaming RPC. Better to accept that we can't always reuse // TCP connections. - closeErr := hc.request.Body.Close() - if retErr == nil { - retErr = closeErr + if DisallowBidiStreamingHttp11 { + closeErr := hc.request.Body.Close() + if retErr == nil { + retErr = closeErr + } } }() defer flushResponseWriter(hc.responseWriter) From c397f340aa2d3f9302da99c4614d4e7504314bfa Mon Sep 17 00:00:00 2001 From: Nugraha Date: Fri, 22 Aug 2025 03:54:08 +0700 Subject: [PATCH 2/6] chore: wip 2 Signed-off-by: Nugraha --- client.go | 4 ++++ connect_ext_test.go | 6 +++++- duplex_http_call.go | 7 ++++++- duplex_http_call_test.go | 1 + features.go | 6 +++--- handler.go | 10 +++++++++- option.go | 16 ++++++++++++++++ protocol.go | 4 ++++ protocol_connect.go | 9 ++++++++- protocol_grpc.go | 6 +++++- 10 files changed, 61 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index 1a7b1d64..88ca1883 100644 --- a/client.go +++ b/client.go @@ -65,6 +65,8 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien EnableGet: config.EnableGet, GetURLMaxBytes: config.GetURLMaxBytes, GetUseFallback: config.GetUseFallback, + + Experimental: config.Experimental, }, ) if protocolErr != nil { @@ -213,6 +215,8 @@ type clientConfig struct { GetURLMaxBytes int GetUseFallback bool IdempotencyLevel IdempotencyLevel + + Experimental ExperimentalFeatures } func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Error) { diff --git a/connect_ext_test.go b/connect_ext_test.go index b93c5708..77b58568 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -2179,6 +2179,9 @@ func TestBidiOverHTTP1(t *testing.T) { client := pingv1connect.NewPingServiceClient( &http.Client{Transport: server.TransportHTTP1()}, server.URL(), + // connect.WithExperimental(connect.ExperimentalFeatures{ + // AllowBidiStreamOverHTTP11: true, + // }), ) stream := client.CumSum(context.Background()) // Stream creates an async request, can error on Send or Receive. @@ -2186,9 +2189,10 @@ func TestBidiOverHTTP1(t *testing.T) { assert.ErrorIs(t, err, io.EOF) } _, err := stream.Receive() - assert.NotNil(t, err) assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) + assert.NotNil(t, err) assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported") + assert.Nil(t, stream.CloseRequest()) assert.Nil(t, stream.CloseResponse()) } diff --git a/duplex_http_call.go b/duplex_http_call.go index 4ee2bcf2..c751c6b0 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -50,6 +50,8 @@ type duplexHTTPCall struct { responseReady chan struct{} response *http.Response responseErr error + + allowBidiStreamOverHTTP11 bool } func newDuplexHTTPCall( @@ -58,6 +60,7 @@ func newDuplexHTTPCall( url *url.URL, spec Spec, header http.Header, + features ExperimentalFeatures, ) *duplexHTTPCall { // ensure we make a copy of the url before we pass along to the // Request. This ensures if a transport out of our control wants @@ -87,6 +90,8 @@ func newDuplexHTTPCall( streamType: spec.StreamType, request: request, responseReady: make(chan struct{}), + + allowBidiStreamOverHTTP11: features.AllowBidiStreamOverHTTP11, } } @@ -326,7 +331,7 @@ func (d *duplexHTTPCall) makeRequest() { _ = d.CloseWrite() return } - if DisallowBidiStreamingHttp11 { + if !d.allowBidiStreamOverHTTP11 { if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 { // If we somehow dialed an HTTP/1.x server, fail with an explicit message // rather than returning a more cryptic error later on. diff --git a/duplex_http_call_test.go b/duplex_http_call_test.go index cff1654d..99bea8f1 100644 --- a/duplex_http_call_test.go +++ b/duplex_http_call_test.go @@ -54,6 +54,7 @@ func TestHTTPCallGetBody(t *testing.T) { serverURL, Spec{StreamType: StreamTypeUnary}, http.Header{}, + ExperimentalFeatures{}, ) getBodyCalled := false call.onRequestSend = func(*http.Request) { diff --git a/features.go b/features.go index a6132edc..705b36b7 100644 --- a/features.go +++ b/features.go @@ -1,5 +1,5 @@ package connect -var ( - DisallowBidiStreamingHttp11 = true -) +type ExperimentalFeatures struct { + AllowBidiStreamOverHTTP11 bool +} diff --git a/handler.go b/handler.go index e55489f6..744a0b1b 100644 --- a/handler.go +++ b/handler.go @@ -31,6 +31,7 @@ type Handler struct { protocolHandlers map[string][]protocolHandler // Method to protocol handlers allowMethod string // Allow header acceptPost string // Accept-Post header + experimental ExperimentalFeatures } // NewUnaryHandler constructs a [Handler] for a request-response procedure. @@ -82,6 +83,8 @@ func NewUnaryHandler[Req, Res any]( protocolHandlers: mappedMethodHandlers(protocolHandlers), allowMethod: sortedAllowMethodValue(protocolHandlers), acceptPost: sortedAcceptPostValue(protocolHandlers), + + experimental: config.Experimental, } } @@ -165,7 +168,7 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re if isBidi && request.ProtoMajor < 2 { // Check if we allow bidi stream over HTTP/1.1, enable full-duplex support // and if fail, we fallback to the default behaviour. - if !DisallowBidiStreamingHttp11 { + if h.experimental.AllowBidiStreamOverHTTP11 { responseController := http.NewResponseController(responseWriter) if err := responseController.EnableFullDuplex(); err == nil { goto Pass @@ -262,6 +265,8 @@ type handlerConfig struct { ReadMaxBytes int SendMaxBytes int StreamType StreamType + + Experimental ExperimentalFeatures } func newHandlerConfig(procedure string, streamType StreamType, options []HandlerOption) *handlerConfig { @@ -314,6 +319,7 @@ func (c *handlerConfig) newProtocolHandlers() []protocolHandler { SendMaxBytes: c.SendMaxBytes, RequireConnectProtocolHeader: c.RequireConnectProtocolHeader, IdempotencyLevel: c.IdempotencyLevel, + Experimental: c.Experimental, })) } return handlers @@ -333,5 +339,7 @@ func newStreamHandler( protocolHandlers: mappedMethodHandlers(protocolHandlers), allowMethod: sortedAllowMethodValue(protocolHandlers), acceptPost: sortedAcceptPostValue(protocolHandlers), + + experimental: config.Experimental, } } diff --git a/option.go b/option.go index 703f8a58..55ccd8dd 100644 --- a/option.go +++ b/option.go @@ -230,6 +230,10 @@ func WithCodec(codec Codec) Option { return &codecOption{Codec: codec} } +func WithExperimental(features ExperimentalFeatures) Option { + return &experimentalOption{features} +} + // WithCompressMinBytes sets a minimum size threshold for compression: // regardless of compressor configuration, messages smaller than the configured // minimum are sent uncompressed. @@ -401,6 +405,18 @@ func (o *clientOptionsOption) applyToClient(config *clientConfig) { } } +type experimentalOption struct { + features ExperimentalFeatures +} + +func (o *experimentalOption) applyToClient(config *clientConfig) { + config.Experimental = o.features +} + +func (o *experimentalOption) applyToHandler(config *handlerConfig) { + config.Experimental = o.features +} + type codecOption struct { Codec Codec } diff --git a/protocol.go b/protocol.go index 9add614c..1005b51f 100644 --- a/protocol.go +++ b/protocol.go @@ -83,6 +83,8 @@ type protocolHandlerParams struct { SendMaxBytes int RequireConnectProtocolHeader bool IdempotencyLevel IdempotencyLevel + + Experimental ExperimentalFeatures } // Handler is the server side of a protocol. HTTP handlers typically support @@ -132,6 +134,8 @@ type protocolClientParams struct { // The gRPC family of protocols always needs access to a Protobuf codec to // marshal and unmarshal errors. Protobuf Codec + + Experimental ExperimentalFeatures } // Client is the client side of a protocol. HTTP clients typically use a single diff --git a/protocol_connect.go b/protocol_connect.go index 6828ab4d..5111c775 100644 --- a/protocol_connect.go +++ b/protocol_connect.go @@ -360,7 +360,14 @@ func (c *connectClient) NewConn( } // else effectively unbounded } } - duplexCall := newDuplexHTTPCall(ctx, c.HTTPClient, c.URL, spec, header) + duplexCall := newDuplexHTTPCall( + ctx, + c.HTTPClient, + c.URL, + spec, + header, + c.Experimental, + ) var conn streamingClientConn if spec.StreamType == StreamTypeUnary { unaryConn := &connectUnaryClientConn{ diff --git a/protocol_grpc.go b/protocol_grpc.go index 3205513f..5557c832 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -214,6 +214,7 @@ func (g *grpcHandler) NewConn( }, web: g.web, }, + experimental: g.Experimental, }) if failed != nil { // Negotiation failed, so we can't establish a stream. @@ -280,6 +281,7 @@ func (g *grpcClient) NewConn( g.URL, spec, header, + g.Experimental, ) conn := &grpcClientConn{ spec: spec, @@ -466,6 +468,8 @@ type grpcHandlerConn struct { wroteToBody bool request *http.Request unmarshaler grpcUnmarshaler + + experimental ExperimentalFeatures } func (hc *grpcHandlerConn) Spec() Spec { @@ -515,7 +519,7 @@ func (hc *grpcHandlerConn) Close(err error) (retErr error) { // a well-intentioned client may just not expect the server to be returning // an error for a streaming RPC. Better to accept that we can't always reuse // TCP connections. - if DisallowBidiStreamingHttp11 { + if !hc.experimental.AllowBidiStreamOverHTTP11 { closeErr := hc.request.Body.Close() if retErr == nil { retErr = closeErr From 1c3aeb1aa255aec8d0db77869a4cf0b49ba666ef Mon Sep 17 00:00:00 2001 From: Nugraha Date: Fri, 22 Aug 2025 04:07:20 +0700 Subject: [PATCH 3/6] chore: wip unit test ok Signed-off-by: Nugraha --- connect_ext_test.go | 86 +++++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 23 deletions(-) diff --git a/connect_ext_test.go b/connect_ext_test.go index 77b58568..ad072ecb 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -2169,32 +2169,72 @@ func TestWebXUserAgent(t *testing.T) { func TestBidiOverHTTP1(t *testing.T) { t.Parallel() - mux := http.NewServeMux() - mux.Handle(pingv1connect.NewPingServiceHandler(pingServer{})) - server := memhttptest.NewServer(t, mux) - // Clients expecting a full-duplex connection that end up with a simplex - // HTTP/1.1 connection shouldn't hang. Instead, the server should close the - // TCP connection. - client := pingv1connect.NewPingServiceClient( - &http.Client{Transport: server.TransportHTTP1()}, - server.URL(), - // connect.WithExperimental(connect.ExperimentalFeatures{ - // AllowBidiStreamOverHTTP11: true, - // }), - ) - stream := client.CumSum(context.Background()) - // Stream creates an async request, can error on Send or Receive. - if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil { - assert.ErrorIs(t, err, io.EOF) + type testCase struct { + name string + handlerOptions []connect.HandlerOption + clientOptions []connect.ClientOption + validate func(*testing.T, *pingv1.CumSumResponse, error) } - _, err := stream.Receive() - assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) - assert.NotNil(t, err) - assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported") - assert.Nil(t, stream.CloseRequest()) - assert.Nil(t, stream.CloseResponse()) + for _, tc := range []testCase{ + { + name: "disallow bidi stream over http1", + handlerOptions: []connect.HandlerOption{}, + clientOptions: []connect.ClientOption{}, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported") + }, + }, + { + name: "allow bidi stream over http1", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.NotNil(t, csr) + assert.Equal(t, 2, csr.Sum) + assert.Nil(t, err) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mux := http.NewServeMux() + mux.Handle(pingv1connect.NewPingServiceHandler( + pingServer{}, + tc.handlerOptions..., + )) + server := memhttptest.NewServer(t, mux) + + // Clients expecting a full-duplex connection that end up with a simplex + // HTTP/1.1 connection shouldn't hang. Instead, the server should close the + // TCP connection. + client := pingv1connect.NewPingServiceClient( + &http.Client{Transport: server.TransportHTTP1()}, + server.URL(), + tc.clientOptions..., + ) + stream := client.CumSum(context.Background()) + // Stream creates an async request, can error on Send or Receive. + if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil { + assert.ErrorIs(t, err, io.EOF) + } + recvMsg, err := stream.Receive() + tc.validate(t, recvMsg, err) + + assert.Nil(t, stream.CloseRequest()) + assert.Nil(t, stream.CloseResponse()) + }) + } } func TestHandlerReturnsNilResponse(t *testing.T) { From f7336eec8224069b4a895d29a609522862f15063 Mon Sep 17 00:00:00 2001 From: Nugraha Date: Sat, 23 Aug 2025 01:16:33 +0700 Subject: [PATCH 4/6] feat: default experiment feature flag --- client.go | 1 + connect_ext_test.go | 14 +++++++++++--- features.go | 4 ++++ handler.go | 1 + 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index 88ca1883..4e638f2e 100644 --- a/client.go +++ b/client.go @@ -231,6 +231,7 @@ func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Err Procedure: protoPath, CompressionPools: make(map[string]*compressionPool), BufferPool: newBufferPool(), + Experimental: DefaultExperimentalFeatures, } withProtoBinaryCodec().applyToClient(&config) withGzip().applyToClient(&config) diff --git a/connect_ext_test.go b/connect_ext_test.go index ad072ecb..6b215d16 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -2179,9 +2179,17 @@ func TestBidiOverHTTP1(t *testing.T) { for _, tc := range []testCase{ { - name: "disallow bidi stream over http1", - handlerOptions: []connect.HandlerOption{}, - clientOptions: []connect.ClientOption{}, + name: "disallow bidi stream over http1", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: false, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: false, + }), + }, validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) assert.NotNil(t, err) diff --git a/features.go b/features.go index 705b36b7..bb74eda6 100644 --- a/features.go +++ b/features.go @@ -3,3 +3,7 @@ package connect type ExperimentalFeatures struct { AllowBidiStreamOverHTTP11 bool } + +var DefaultExperimentalFeatures = ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, +} diff --git a/handler.go b/handler.go index 744a0b1b..df18067e 100644 --- a/handler.go +++ b/handler.go @@ -277,6 +277,7 @@ func newHandlerConfig(procedure string, streamType StreamType, options []Handler Codecs: make(map[string]Codec), BufferPool: newBufferPool(), StreamType: streamType, + Experimental: DefaultExperimentalFeatures, } withProtoBinaryCodec().applyToHandler(&config) withProtoJSONCodecs().applyToHandler(&config) From 010f67a652eafc344ebedfcd767b5263904d2b7e Mon Sep 17 00:00:00 2001 From: Nugraha Date: Sat, 23 Aug 2025 01:21:09 +0700 Subject: [PATCH 5/6] chore: try switch off by default --- features.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/features.go b/features.go index bb74eda6..91e73f16 100644 --- a/features.go +++ b/features.go @@ -5,5 +5,5 @@ type ExperimentalFeatures struct { } var DefaultExperimentalFeatures = ExperimentalFeatures{ - AllowBidiStreamOverHTTP11: true, + AllowBidiStreamOverHTTP11: false, } From d8e18d432402d03ecfc21111fe4b2bde7deb4101 Mon Sep 17 00:00:00 2001 From: Nugraha Date: Sat, 23 Aug 2025 01:32:21 +0700 Subject: [PATCH 6/6] chore: enable bidi stream h1, add test for grpc/web --- connect_ext_test.go | 38 ++++++++++++++++++++++++++++++++++++++ features.go | 2 +- protocol_grpc.go | 3 ++- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/connect_ext_test.go b/connect_ext_test.go index 6b215d16..dadf862b 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -2214,6 +2214,44 @@ func TestBidiOverHTTP1(t *testing.T) { assert.Nil(t, err) }, }, + { + name: "allow bidi stream over http1 grpc", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + connect.WithGRPC(), + }, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.NotNil(t, csr) + assert.Equal(t, 2, csr.Sum) + assert.Nil(t, err) + }, + }, + { + name: "allow bidi stream over http1 grpc-web", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + connect.WithGRPCWeb(), + }, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.NotNil(t, csr) + assert.Equal(t, 2, csr.Sum) + assert.Nil(t, err) + }, + }, } { t.Run(tc.name, func(t *testing.T) { mux := http.NewServeMux() diff --git a/features.go b/features.go index 91e73f16..bb74eda6 100644 --- a/features.go +++ b/features.go @@ -5,5 +5,5 @@ type ExperimentalFeatures struct { } var DefaultExperimentalFeatures = ExperimentalFeatures{ - AllowBidiStreamOverHTTP11: false, + AllowBidiStreamOverHTTP11: true, } diff --git a/protocol_grpc.go b/protocol_grpc.go index 5557c832..842f0b3f 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -519,7 +519,8 @@ func (hc *grpcHandlerConn) Close(err error) (retErr error) { // a well-intentioned client may just not expect the server to be returning // an error for a streaming RPC. Better to accept that we can't always reuse // TCP connections. - if !hc.experimental.AllowBidiStreamOverHTTP11 { + + if !hc.experimental.AllowBidiStreamOverHTTP11 || hc.request.ProtoMajor > 1 { closeErr := hc.request.Body.Close() if retErr == nil { retErr = closeErr