diff --git a/client.go b/client.go index 1a7b1d64..4e638f2e 100644 --- a/client.go +++ b/client.go @@ -65,6 +65,8 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien EnableGet: config.EnableGet, GetURLMaxBytes: config.GetURLMaxBytes, GetUseFallback: config.GetUseFallback, + + Experimental: config.Experimental, }, ) if protocolErr != nil { @@ -213,6 +215,8 @@ type clientConfig struct { GetURLMaxBytes int GetUseFallback bool IdempotencyLevel IdempotencyLevel + + Experimental ExperimentalFeatures } func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Error) { @@ -227,6 +231,7 @@ func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Err Procedure: protoPath, CompressionPools: make(map[string]*compressionPool), BufferPool: newBufferPool(), + Experimental: DefaultExperimentalFeatures, } withProtoBinaryCodec().applyToClient(&config) withGzip().applyToClient(&config) diff --git a/connect_ext_test.go b/connect_ext_test.go index b93c5708..dadf862b 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -2169,28 +2169,118 @@ func TestWebXUserAgent(t *testing.T) { func TestBidiOverHTTP1(t *testing.T) { t.Parallel() - mux := http.NewServeMux() - mux.Handle(pingv1connect.NewPingServiceHandler(pingServer{})) - server := memhttptest.NewServer(t, mux) - // Clients expecting a full-duplex connection that end up with a simplex - // HTTP/1.1 connection shouldn't hang. Instead, the server should close the - // TCP connection. - client := pingv1connect.NewPingServiceClient( - &http.Client{Transport: server.TransportHTTP1()}, - server.URL(), - ) - stream := client.CumSum(context.Background()) - // Stream creates an async request, can error on Send or Receive. - if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil { - assert.ErrorIs(t, err, io.EOF) + type testCase struct { + name string + handlerOptions []connect.HandlerOption + clientOptions []connect.ClientOption + validate func(*testing.T, *pingv1.CumSumResponse, error) + } + + for _, tc := range []testCase{ + { + name: "disallow bidi stream over http1", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: false, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: false, + }), + }, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported") + }, + }, + { + name: "allow bidi stream over http1", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.NotNil(t, csr) + assert.Equal(t, 2, csr.Sum) + assert.Nil(t, err) + }, + }, + { + name: "allow bidi stream over http1 grpc", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + connect.WithGRPC(), + }, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.NotNil(t, csr) + assert.Equal(t, 2, csr.Sum) + assert.Nil(t, err) + }, + }, + { + name: "allow bidi stream over http1 grpc-web", + handlerOptions: []connect.HandlerOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + }, + clientOptions: []connect.ClientOption{ + connect.WithExperimental(connect.ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, + }), + connect.WithGRPCWeb(), + }, + validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) { + assert.NotNil(t, csr) + assert.Equal(t, 2, csr.Sum) + assert.Nil(t, err) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mux := http.NewServeMux() + mux.Handle(pingv1connect.NewPingServiceHandler( + pingServer{}, + tc.handlerOptions..., + )) + server := memhttptest.NewServer(t, mux) + + // Clients expecting a full-duplex connection that end up with a simplex + // HTTP/1.1 connection shouldn't hang. Instead, the server should close the + // TCP connection. + client := pingv1connect.NewPingServiceClient( + &http.Client{Transport: server.TransportHTTP1()}, + server.URL(), + tc.clientOptions..., + ) + stream := client.CumSum(context.Background()) + // Stream creates an async request, can error on Send or Receive. + if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil { + assert.ErrorIs(t, err, io.EOF) + } + recvMsg, err := stream.Receive() + tc.validate(t, recvMsg, err) + + assert.Nil(t, stream.CloseRequest()) + assert.Nil(t, stream.CloseResponse()) + }) } - _, err := stream.Receive() - assert.NotNil(t, err) - assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown) - assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported") - assert.Nil(t, stream.CloseRequest()) - assert.Nil(t, stream.CloseResponse()) } func TestHandlerReturnsNilResponse(t *testing.T) { diff --git a/duplex_http_call.go b/duplex_http_call.go index 768a5bbb..c751c6b0 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -50,6 +50,8 @@ type duplexHTTPCall struct { responseReady chan struct{} response *http.Response responseErr error + + allowBidiStreamOverHTTP11 bool } func newDuplexHTTPCall( @@ -58,6 +60,7 @@ func newDuplexHTTPCall( url *url.URL, spec Spec, header http.Header, + features ExperimentalFeatures, ) *duplexHTTPCall { // ensure we make a copy of the url before we pass along to the // Request. This ensures if a transport out of our control wants @@ -87,6 +90,8 @@ func newDuplexHTTPCall( streamType: spec.StreamType, request: request, responseReady: make(chan struct{}), + + allowBidiStreamOverHTTP11: features.AllowBidiStreamOverHTTP11, } } @@ -326,18 +331,21 @@ func (d *duplexHTTPCall) makeRequest() { _ = d.CloseWrite() return } - if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 { - // If we somehow dialed an HTTP/1.x server, fail with an explicit message - // rather than returning a more cryptic error later on. - d.responseErr = errorf( - CodeUnimplemented, - "response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2", - d.request.URL, - response.ProtoMajor, - response.ProtoMinor, - ) - _ = d.CloseWrite() + if !d.allowBidiStreamOverHTTP11 { + if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 { + // If we somehow dialed an HTTP/1.x server, fail with an explicit message + // rather than returning a more cryptic error later on. + d.responseErr = errorf( + CodeUnimplemented, + "response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2", + d.request.URL, + response.ProtoMajor, + response.ProtoMinor, + ) + _ = d.CloseWrite() + } } + } // getNoBody is a GetBody function for http.NoBody. diff --git a/duplex_http_call_test.go b/duplex_http_call_test.go index cff1654d..99bea8f1 100644 --- a/duplex_http_call_test.go +++ b/duplex_http_call_test.go @@ -54,6 +54,7 @@ func TestHTTPCallGetBody(t *testing.T) { serverURL, Spec{StreamType: StreamTypeUnary}, http.Header{}, + ExperimentalFeatures{}, ) getBodyCalled := false call.onRequestSend = func(*http.Request) { diff --git a/features.go b/features.go new file mode 100644 index 00000000..bb74eda6 --- /dev/null +++ b/features.go @@ -0,0 +1,9 @@ +package connect + +type ExperimentalFeatures struct { + AllowBidiStreamOverHTTP11 bool +} + +var DefaultExperimentalFeatures = ExperimentalFeatures{ + AllowBidiStreamOverHTTP11: true, +} diff --git a/handler.go b/handler.go index 9f95627b..df18067e 100644 --- a/handler.go +++ b/handler.go @@ -31,6 +31,7 @@ type Handler struct { protocolHandlers map[string][]protocolHandler // Method to protocol handlers allowMethod string // Allow header acceptPost string // Accept-Post header + experimental ExperimentalFeatures } // NewUnaryHandler constructs a [Handler] for a request-response procedure. @@ -82,6 +83,8 @@ func NewUnaryHandler[Req, Res any]( protocolHandlers: mappedMethodHandlers(protocolHandlers), allowMethod: sortedAllowMethodValue(protocolHandlers), acceptPost: sortedAcceptPostValue(protocolHandlers), + + experimental: config.Experimental, } } @@ -163,12 +166,22 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re // okay if we can't re-use the connection. isBidi := (h.spec.StreamType & StreamTypeBidi) == StreamTypeBidi if isBidi && request.ProtoMajor < 2 { + // Check if we allow bidi stream over HTTP/1.1, enable full-duplex support + // and if fail, we fallback to the default behaviour. + if h.experimental.AllowBidiStreamOverHTTP11 { + responseController := http.NewResponseController(responseWriter) + if err := responseController.EnableFullDuplex(); err == nil { + goto Pass + } + } + // Clients coded to expect full-duplex connections may hang if they've // mistakenly negotiated HTTP/1.1. To unblock them, we must close the // underlying TCP connection. responseWriter.Header().Set("Connection", "close") responseWriter.WriteHeader(http.StatusHTTPVersionNotSupported) return + Pass: } protocolHandlers := h.protocolHandlers[request.Method] @@ -252,6 +265,8 @@ type handlerConfig struct { ReadMaxBytes int SendMaxBytes int StreamType StreamType + + Experimental ExperimentalFeatures } func newHandlerConfig(procedure string, streamType StreamType, options []HandlerOption) *handlerConfig { @@ -262,6 +277,7 @@ func newHandlerConfig(procedure string, streamType StreamType, options []Handler Codecs: make(map[string]Codec), BufferPool: newBufferPool(), StreamType: streamType, + Experimental: DefaultExperimentalFeatures, } withProtoBinaryCodec().applyToHandler(&config) withProtoJSONCodecs().applyToHandler(&config) @@ -304,6 +320,7 @@ func (c *handlerConfig) newProtocolHandlers() []protocolHandler { SendMaxBytes: c.SendMaxBytes, RequireConnectProtocolHeader: c.RequireConnectProtocolHeader, IdempotencyLevel: c.IdempotencyLevel, + Experimental: c.Experimental, })) } return handlers @@ -323,5 +340,7 @@ func newStreamHandler( protocolHandlers: mappedMethodHandlers(protocolHandlers), allowMethod: sortedAllowMethodValue(protocolHandlers), acceptPost: sortedAcceptPostValue(protocolHandlers), + + experimental: config.Experimental, } } diff --git a/option.go b/option.go index 703f8a58..55ccd8dd 100644 --- a/option.go +++ b/option.go @@ -230,6 +230,10 @@ func WithCodec(codec Codec) Option { return &codecOption{Codec: codec} } +func WithExperimental(features ExperimentalFeatures) Option { + return &experimentalOption{features} +} + // WithCompressMinBytes sets a minimum size threshold for compression: // regardless of compressor configuration, messages smaller than the configured // minimum are sent uncompressed. @@ -401,6 +405,18 @@ func (o *clientOptionsOption) applyToClient(config *clientConfig) { } } +type experimentalOption struct { + features ExperimentalFeatures +} + +func (o *experimentalOption) applyToClient(config *clientConfig) { + config.Experimental = o.features +} + +func (o *experimentalOption) applyToHandler(config *handlerConfig) { + config.Experimental = o.features +} + type codecOption struct { Codec Codec } diff --git a/protocol.go b/protocol.go index 9add614c..1005b51f 100644 --- a/protocol.go +++ b/protocol.go @@ -83,6 +83,8 @@ type protocolHandlerParams struct { SendMaxBytes int RequireConnectProtocolHeader bool IdempotencyLevel IdempotencyLevel + + Experimental ExperimentalFeatures } // Handler is the server side of a protocol. HTTP handlers typically support @@ -132,6 +134,8 @@ type protocolClientParams struct { // The gRPC family of protocols always needs access to a Protobuf codec to // marshal and unmarshal errors. Protobuf Codec + + Experimental ExperimentalFeatures } // Client is the client side of a protocol. HTTP clients typically use a single diff --git a/protocol_connect.go b/protocol_connect.go index 6828ab4d..5111c775 100644 --- a/protocol_connect.go +++ b/protocol_connect.go @@ -360,7 +360,14 @@ func (c *connectClient) NewConn( } // else effectively unbounded } } - duplexCall := newDuplexHTTPCall(ctx, c.HTTPClient, c.URL, spec, header) + duplexCall := newDuplexHTTPCall( + ctx, + c.HTTPClient, + c.URL, + spec, + header, + c.Experimental, + ) var conn streamingClientConn if spec.StreamType == StreamTypeUnary { unaryConn := &connectUnaryClientConn{ diff --git a/protocol_grpc.go b/protocol_grpc.go index e10ecad7..842f0b3f 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -214,6 +214,7 @@ func (g *grpcHandler) NewConn( }, web: g.web, }, + experimental: g.Experimental, }) if failed != nil { // Negotiation failed, so we can't establish a stream. @@ -280,6 +281,7 @@ func (g *grpcClient) NewConn( g.URL, spec, header, + g.Experimental, ) conn := &grpcClientConn{ spec: spec, @@ -466,6 +468,8 @@ type grpcHandlerConn struct { wroteToBody bool request *http.Request unmarshaler grpcUnmarshaler + + experimental ExperimentalFeatures } func (hc *grpcHandlerConn) Spec() Spec { @@ -515,9 +519,12 @@ func (hc *grpcHandlerConn) Close(err error) (retErr error) { // a well-intentioned client may just not expect the server to be returning // an error for a streaming RPC. Better to accept that we can't always reuse // TCP connections. - closeErr := hc.request.Body.Close() - if retErr == nil { - retErr = closeErr + + if !hc.experimental.AllowBidiStreamOverHTTP11 || hc.request.ProtoMajor > 1 { + closeErr := hc.request.Body.Close() + if retErr == nil { + retErr = closeErr + } } }() defer flushResponseWriter(hc.responseWriter)