Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
EnableGet: config.EnableGet,
GetURLMaxBytes: config.GetURLMaxBytes,
GetUseFallback: config.GetUseFallback,

Experimental: config.Experimental,
},
)
if protocolErr != nil {
Expand Down Expand Up @@ -213,6 +215,8 @@ type clientConfig struct {
GetURLMaxBytes int
GetUseFallback bool
IdempotencyLevel IdempotencyLevel

Experimental ExperimentalFeatures
}

func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Error) {
Expand All @@ -227,6 +231,7 @@ func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Err
Procedure: protoPath,
CompressionPools: make(map[string]*compressionPool),
BufferPool: newBufferPool(),
Experimental: DefaultExperimentalFeatures,
}
withProtoBinaryCodec().applyToClient(&config)
withGzip().applyToClient(&config)
Expand Down
130 changes: 110 additions & 20 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2169,28 +2169,118 @@ func TestWebXUserAgent(t *testing.T) {

func TestBidiOverHTTP1(t *testing.T) {
t.Parallel()
mux := http.NewServeMux()
mux.Handle(pingv1connect.NewPingServiceHandler(pingServer{}))
server := memhttptest.NewServer(t, mux)

// Clients expecting a full-duplex connection that end up with a simplex
// HTTP/1.1 connection shouldn't hang. Instead, the server should close the
// TCP connection.
client := pingv1connect.NewPingServiceClient(
&http.Client{Transport: server.TransportHTTP1()},
server.URL(),
)
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil {
assert.ErrorIs(t, err, io.EOF)
type testCase struct {
name string
handlerOptions []connect.HandlerOption
clientOptions []connect.ClientOption
validate func(*testing.T, *pingv1.CumSumResponse, error)
}

for _, tc := range []testCase{
{
name: "disallow bidi stream over http1",
handlerOptions: []connect.HandlerOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: false,
}),
},
clientOptions: []connect.ClientOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: false,
}),
},
validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) {
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported")
},
},
{
name: "allow bidi stream over http1",
handlerOptions: []connect.HandlerOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: true,
}),
},
clientOptions: []connect.ClientOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: true,
}),
},
validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) {
assert.NotNil(t, csr)
assert.Equal(t, 2, csr.Sum)
assert.Nil(t, err)
},
},
{
name: "allow bidi stream over http1 grpc",
handlerOptions: []connect.HandlerOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: true,
}),
},
clientOptions: []connect.ClientOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: true,
}),
connect.WithGRPC(),
},
validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) {
assert.NotNil(t, csr)
assert.Equal(t, 2, csr.Sum)
assert.Nil(t, err)
},
},
{
name: "allow bidi stream over http1 grpc-web",
handlerOptions: []connect.HandlerOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: true,
}),
},
clientOptions: []connect.ClientOption{
connect.WithExperimental(connect.ExperimentalFeatures{
AllowBidiStreamOverHTTP11: true,
}),
connect.WithGRPCWeb(),
},
validate: func(t *testing.T, csr *pingv1.CumSumResponse, err error) {
assert.NotNil(t, csr)
assert.Equal(t, 2, csr.Sum)
assert.Nil(t, err)
},
},
} {
t.Run(tc.name, func(t *testing.T) {
mux := http.NewServeMux()
mux.Handle(pingv1connect.NewPingServiceHandler(
pingServer{},
tc.handlerOptions...,
))
server := memhttptest.NewServer(t, mux)

// Clients expecting a full-duplex connection that end up with a simplex
// HTTP/1.1 connection shouldn't hang. Instead, the server should close the
// TCP connection.
client := pingv1connect.NewPingServiceClient(
&http.Client{Transport: server.TransportHTTP1()},
server.URL(),
tc.clientOptions...,
)
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}
recvMsg, err := stream.Receive()
tc.validate(t, recvMsg, err)

assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
})
}
_, err := stream.Receive()
assert.NotNil(t, err)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported")
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
}

func TestHandlerReturnsNilResponse(t *testing.T) {
Expand Down
30 changes: 19 additions & 11 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type duplexHTTPCall struct {
responseReady chan struct{}
response *http.Response
responseErr error

allowBidiStreamOverHTTP11 bool
}

func newDuplexHTTPCall(
Expand All @@ -58,6 +60,7 @@ func newDuplexHTTPCall(
url *url.URL,
spec Spec,
header http.Header,
features ExperimentalFeatures,
) *duplexHTTPCall {
// ensure we make a copy of the url before we pass along to the
// Request. This ensures if a transport out of our control wants
Expand Down Expand Up @@ -87,6 +90,8 @@ func newDuplexHTTPCall(
streamType: spec.StreamType,
request: request,
responseReady: make(chan struct{}),

allowBidiStreamOverHTTP11: features.AllowBidiStreamOverHTTP11,
}
}

Expand Down Expand Up @@ -326,18 +331,21 @@ func (d *duplexHTTPCall) makeRequest() {
_ = d.CloseWrite()
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
// rather than returning a more cryptic error later on.
d.responseErr = errorf(
CodeUnimplemented,
"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
d.request.URL,
response.ProtoMajor,
response.ProtoMinor,
)
_ = d.CloseWrite()
if !d.allowBidiStreamOverHTTP11 {
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
// rather than returning a more cryptic error later on.
d.responseErr = errorf(
CodeUnimplemented,
"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
d.request.URL,
response.ProtoMajor,
response.ProtoMinor,
)
_ = d.CloseWrite()
}
}

}

// getNoBody is a GetBody function for http.NoBody.
Expand Down
1 change: 1 addition & 0 deletions duplex_http_call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestHTTPCallGetBody(t *testing.T) {
serverURL,
Spec{StreamType: StreamTypeUnary},
http.Header{},
ExperimentalFeatures{},
)
getBodyCalled := false
call.onRequestSend = func(*http.Request) {
Expand Down
9 changes: 9 additions & 0 deletions features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package connect

type ExperimentalFeatures struct {
AllowBidiStreamOverHTTP11 bool
}

var DefaultExperimentalFeatures = ExperimentalFeatures{
AllowBidiStreamOverHTTP11: true,
}
19 changes: 19 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Handler struct {
protocolHandlers map[string][]protocolHandler // Method to protocol handlers
allowMethod string // Allow header
acceptPost string // Accept-Post header
experimental ExperimentalFeatures
}

// NewUnaryHandler constructs a [Handler] for a request-response procedure.
Expand Down Expand Up @@ -82,6 +83,8 @@ func NewUnaryHandler[Req, Res any](
protocolHandlers: mappedMethodHandlers(protocolHandlers),
allowMethod: sortedAllowMethodValue(protocolHandlers),
acceptPost: sortedAcceptPostValue(protocolHandlers),

experimental: config.Experimental,
}
}

Expand Down Expand Up @@ -163,12 +166,22 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re
// okay if we can't re-use the connection.
isBidi := (h.spec.StreamType & StreamTypeBidi) == StreamTypeBidi
if isBidi && request.ProtoMajor < 2 {
// Check if we allow bidi stream over HTTP/1.1, enable full-duplex support
// and if fail, we fallback to the default behaviour.
if h.experimental.AllowBidiStreamOverHTTP11 {
responseController := http.NewResponseController(responseWriter)
if err := responseController.EnableFullDuplex(); err == nil {
goto Pass
}
}

// Clients coded to expect full-duplex connections may hang if they've
// mistakenly negotiated HTTP/1.1. To unblock them, we must close the
// underlying TCP connection.
responseWriter.Header().Set("Connection", "close")
responseWriter.WriteHeader(http.StatusHTTPVersionNotSupported)
return
Pass:
}

protocolHandlers := h.protocolHandlers[request.Method]
Expand Down Expand Up @@ -252,6 +265,8 @@ type handlerConfig struct {
ReadMaxBytes int
SendMaxBytes int
StreamType StreamType

Experimental ExperimentalFeatures
}

func newHandlerConfig(procedure string, streamType StreamType, options []HandlerOption) *handlerConfig {
Expand All @@ -262,6 +277,7 @@ func newHandlerConfig(procedure string, streamType StreamType, options []Handler
Codecs: make(map[string]Codec),
BufferPool: newBufferPool(),
StreamType: streamType,
Experimental: DefaultExperimentalFeatures,
}
withProtoBinaryCodec().applyToHandler(&config)
withProtoJSONCodecs().applyToHandler(&config)
Expand Down Expand Up @@ -304,6 +320,7 @@ func (c *handlerConfig) newProtocolHandlers() []protocolHandler {
SendMaxBytes: c.SendMaxBytes,
RequireConnectProtocolHeader: c.RequireConnectProtocolHeader,
IdempotencyLevel: c.IdempotencyLevel,
Experimental: c.Experimental,
}))
}
return handlers
Expand All @@ -323,5 +340,7 @@ func newStreamHandler(
protocolHandlers: mappedMethodHandlers(protocolHandlers),
allowMethod: sortedAllowMethodValue(protocolHandlers),
acceptPost: sortedAcceptPostValue(protocolHandlers),

experimental: config.Experimental,
}
}
16 changes: 16 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func WithCodec(codec Codec) Option {
return &codecOption{Codec: codec}
}

func WithExperimental(features ExperimentalFeatures) Option {
return &experimentalOption{features}
}

// WithCompressMinBytes sets a minimum size threshold for compression:
// regardless of compressor configuration, messages smaller than the configured
// minimum are sent uncompressed.
Expand Down Expand Up @@ -401,6 +405,18 @@ func (o *clientOptionsOption) applyToClient(config *clientConfig) {
}
}

type experimentalOption struct {
features ExperimentalFeatures
}

func (o *experimentalOption) applyToClient(config *clientConfig) {
config.Experimental = o.features
}

func (o *experimentalOption) applyToHandler(config *handlerConfig) {
config.Experimental = o.features
}

type codecOption struct {
Codec Codec
}
Expand Down
4 changes: 4 additions & 0 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type protocolHandlerParams struct {
SendMaxBytes int
RequireConnectProtocolHeader bool
IdempotencyLevel IdempotencyLevel

Experimental ExperimentalFeatures
}

// Handler is the server side of a protocol. HTTP handlers typically support
Expand Down Expand Up @@ -132,6 +134,8 @@ type protocolClientParams struct {
// The gRPC family of protocols always needs access to a Protobuf codec to
// marshal and unmarshal errors.
Protobuf Codec

Experimental ExperimentalFeatures
}

// Client is the client side of a protocol. HTTP clients typically use a single
Expand Down
9 changes: 8 additions & 1 deletion protocol_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,14 @@ func (c *connectClient) NewConn(
} // else effectively unbounded
}
}
duplexCall := newDuplexHTTPCall(ctx, c.HTTPClient, c.URL, spec, header)
duplexCall := newDuplexHTTPCall(
ctx,
c.HTTPClient,
c.URL,
spec,
header,
c.Experimental,
)
var conn streamingClientConn
if spec.StreamType == StreamTypeUnary {
unaryConn := &connectUnaryClientConn{
Expand Down
Loading