Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"io"
"net/http"
"net/url"
"time"
)

// Version is the semantic version of the connect module.
Expand Down Expand Up @@ -319,6 +320,8 @@ type Spec struct {
Procedure string // for example, "/acme.foo.v1.FooService/Bar"
IsClient bool // otherwise we're in a handler
IdempotencyLevel IdempotencyLevel
ReadTimeout time.Duration
WriteTimeout time.Duration
}

// Peer describes the other party to an RPC.
Expand Down
11 changes: 11 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package connect
import (
"context"
"net/http"
"time"
)

// A Handler is the server-side implementation of a single RPC defined by a
Expand Down Expand Up @@ -255,6 +256,12 @@ func NewBidiStreamHandler[Req, Res any](

// ServeHTTP implements [http.Handler].
func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
if h.spec.ReadTimeout != 0 {
rc := http.NewResponseController(responseWriter)
rc.SetReadDeadline(time.Now().Add(h.spec.ReadTimeout))
rc.SetWriteDeadline(time.Now().Add(h.spec.WriteTimeout))
}

// We don't need to defer functions to close the request body or read to
// EOF: the stream we construct later on already does that, and we only
// return early when dealing with misbehaving clients. In those cases, it's
Expand Down Expand Up @@ -350,6 +357,8 @@ type handlerConfig struct {
ReadMaxBytes int
SendMaxBytes int
StreamType StreamType
ReadTimeout time.Duration
WriteTimeout time.Duration
}

func newHandlerConfig(procedure string, streamType StreamType, options []HandlerOption) *handlerConfig {
Expand All @@ -376,6 +385,8 @@ func (c *handlerConfig) newSpec() Spec {
Schema: c.Schema,
StreamType: c.StreamType,
IdempotencyLevel: c.IdempotencyLevel,
ReadTimeout: c.ReadTimeout,
WriteTimeout: c.WriteTimeout,
}
}

Expand Down
21 changes: 21 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"io"
"net/http"
"time"
)

// A ClientOption configures a [Client].
Expand Down Expand Up @@ -351,6 +352,14 @@ func WithInterceptors(interceptors ...Interceptor) Option {
return &interceptorsOption{interceptors}
}

func WithReadTimeout(value time.Duration) HandlerOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation comment is required for exported methods.

return &readTimeoutOption{value: value}
}

func WithWriteTimeout(value time.Duration) HandlerOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

return &writeTimeoutOption{value: value}
}

// WithOptions composes multiple Options into one.
func WithOptions(options ...Option) Option {
return &optionsOption{options}
Expand Down Expand Up @@ -645,3 +654,15 @@ func (o *conditionalHandlerOptions) applyToHandler(config *handlerConfig) {
option.applyToHandler(config)
}
}

type readTimeoutOption struct{ value time.Duration }

func (o *readTimeoutOption) applyToHandler(config *handlerConfig) {
config.ReadTimeout = o.value
}

type writeTimeoutOption struct{ value time.Duration }

func (o *writeTimeoutOption) applyToHandler(config *handlerConfig) {
config.WriteTimeout = o.value
}