Skip to content

Commit dae3a18

Browse files
committed
add http transport options to http processor, http_client input & output
Signed-off-by: Jem Davies <[email protected]>
1 parent e93b7b1 commit dae3a18

File tree

7 files changed

+487
-3
lines changed

7 files changed

+487
-3
lines changed

internal/httpclient/client.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,20 @@ func NewClientFromOldConfig(conf OldConfig, mgr *service.Resources, opts ...Requ
5757
return nil, err
5858
}
5959

60+
var transport *http.Transport
61+
if conf.customTransport {
62+
transport = conf.transport
63+
} else {
64+
if t, ok := http.DefaultTransport.(*http.Transport); ok {
65+
transport = t.Clone()
66+
} else {
67+
return nil, fmt.Errorf("unable to use http.DefaultTransport, unexpected type: %T", http.DefaultTransport)
68+
}
69+
}
70+
6071
h := Client{
6172
reqCreator: reqCreator,
62-
client: &http.Client{},
73+
client: &http.Client{Transport: transport},
6374
metaExtractFilter: conf.ExtractMetadata,
6475

6576
backoffOn: map[int]struct{}{},
@@ -76,7 +87,7 @@ func NewClientFromOldConfig(conf OldConfig, mgr *service.Resources, opts ...Requ
7687
}
7788

7889
if conf.TLSEnabled && conf.TLSConf != nil {
79-
if c, ok := http.DefaultTransport.(*http.Transport); ok {
90+
if c, ok := h.client.Transport.(*http.Transport); ok {
8091
cloned := c.Clone()
8192
cloned.TLSClientConfig = conf.TLSConf
8293
h.client.Transport = cloned
@@ -94,7 +105,9 @@ func NewClientFromOldConfig(conf OldConfig, mgr *service.Resources, opts ...Requ
94105
}
95106
if h.client.Transport != nil {
96107
if tr, ok := h.client.Transport.(*http.Transport); ok {
97-
tr.Proxy = http.ProxyURL(proxyURL)
108+
cloned := tr.Clone()
109+
cloned.Proxy = http.ProxyURL(proxyURL)
110+
h.client.Transport = cloned
98111
} else {
99112
return nil, fmt.Errorf("unable to apply proxy_url to transport, unexpected type %T", h.client.Transport)
100113
}

internal/httpclient/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"time"
99

10+
"github.com/warpstreamlabs/bento/internal/httptransport"
1011
"github.com/warpstreamlabs/bento/public/service"
1112
)
1213

@@ -27,6 +28,7 @@ const (
2728
hcFieldDumpRequestLogLevel = "dump_request_log_level"
2829
hcFieldTLS = "tls"
2930
hcFieldProxyURL = "proxy_url"
31+
hcFieldTransport = "transport"
3032
)
3133

3234
// ConfigField returns a public API config field spec for an HTTP component,
@@ -101,6 +103,7 @@ func ConfigField(defaultVerb string, forOutput bool, extraChildren ...*service.C
101103
Description("An optional HTTP proxy URL.").
102104
Advanced().
103105
Optional(),
106+
httptransport.CustomTransportConfigSpec(),
104107
)
105108

106109
innerFields = append(innerFields, extraChildren...)
@@ -160,6 +163,10 @@ func ConfigFromParsed(pConf *service.ParsedConfig) (conf OldConfig, err error) {
160163
if conf.clientCtor, err = oauth2ClientCtorFromParsed(pConf); err != nil {
161164
return
162165
}
166+
if conf.transport, conf.customTransport, err = pConf.FieldHTTPTransport(hcFieldTransport); err != nil {
167+
return
168+
}
169+
163170
return
164171
}
165172

@@ -184,4 +191,7 @@ type OldConfig struct {
184191
ProxyURL string
185192
authSigner func(f fs.FS, req *http.Request) error
186193
clientCtor func(context.Context, *http.Client) *http.Client
194+
195+
transport *http.Transport
196+
customTransport bool
187197
}

internal/httptransport/config.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package httptransport
2+
3+
import (
4+
"time"
5+
6+
"github.com/warpstreamlabs/bento/public/service"
7+
)
8+
9+
const (
10+
hcFieldCustomTransportEnabled = "enabled"
11+
hcFieldCustomTransport = "custom_transport"
12+
hcFieldDialContext = "dial_context"
13+
hcFieldDialContextTimeout = "timeout"
14+
hcFieldDialContextKeepAlive = "keep_alive"
15+
hcFieldForceAttemptHTTP2 = "force_http2"
16+
hcFieldMaxIdleConns = "max_idle_connections"
17+
hcFieldIdleConnTimeout = "idle_connection_timeout"
18+
hcFieldTLSHandshakeTimeout = "tls_handshake_timeout"
19+
hcFieldExpectContinueTimeout = "expect_continue_timeout"
20+
)
21+
22+
func CustomTransportConfigSpec() *service.ConfigField {
23+
return service.NewObjectField(hcFieldCustomTransport,
24+
service.NewBoolField(hcFieldCustomTransportEnabled).
25+
Description("Enables a custom HTTP transport. When set to `false` (default), Bento will use Go's [DefaultTransport](https://pkg.go.dev/net/http#DefaultTransport) for the underlying net/http transport. When enabled, settings from the `custom_transport` fields will be applied to the underlying transport. Note that other fields that modify the transport, such as TLS & ProxyURL, will always apply to the transport. The Env Var 'BENTO_OVERRIDE_DEFAULT_HTTP_TRANSPORT' can also be used to ensure the DefaultTransport isn't used.").
26+
Advanced().
27+
Version("1.13.0").
28+
Default(false),
29+
service.NewObjectField(hcFieldDialContext,
30+
service.NewDurationField(hcFieldDialContextTimeout).
31+
Description("Timeout for establishing new network connections.").
32+
Advanced().
33+
Version("1.13.0").
34+
Default("30s"),
35+
service.NewDurationField(hcFieldDialContextKeepAlive).
36+
Description("Keep-alive period for an active network connections used by the dialer.").
37+
Advanced().
38+
Version("1.13.0").
39+
Default("30s"),
40+
).
41+
Description("Settings for the dialer used to create new connections.").
42+
Advanced().
43+
Version("1.13.0").
44+
Optional(),
45+
service.NewBoolField(hcFieldForceAttemptHTTP2).
46+
Description("If true, the transport will attempt to use HTTP/2.").
47+
Advanced().
48+
Version("1.13.0").
49+
Default(true),
50+
service.NewIntField(hcFieldMaxIdleConns).
51+
Description("Controls the maximum number of idle (keep-alive) connections across all hosts. Zero means no limit.").
52+
Advanced().
53+
Version("1.13.0").
54+
Default(100),
55+
service.NewDurationField(hcFieldIdleConnTimeout).
56+
Description("Maximum amount of time an idle (keep-alive) connection will remain idle before closing itself.").
57+
Advanced().
58+
Version("1.13.0").
59+
Default("90s"),
60+
service.NewDurationField(hcFieldTLSHandshakeTimeout).
61+
Description("Maximum time allowed for the TLS handshake to complete when establishing connections.").
62+
Advanced().
63+
Version("1.13.0").
64+
Default("10s"),
65+
service.NewDurationField(hcFieldExpectContinueTimeout).
66+
Description("Time to wait for a server's first response headers after fully writing the request headers if the request has an 'Expect: 100-continue' header. Zero means no timeout and causes the body to be sent immediately, without waiting for the server to approve.").
67+
Advanced().
68+
Version("1.13.0").
69+
Default("1s"),
70+
).
71+
Description("Custom transport options.").
72+
Advanced()
73+
}
74+
75+
type CustomTransport struct {
76+
CustomTransportEnabled bool
77+
DialContextTimeout time.Duration
78+
DialContextKeepAlive time.Duration
79+
ForceAttemptHTTP2 bool
80+
MaxIdleConns int
81+
IdleConnTimeout time.Duration
82+
TlsHandshakeTimeout time.Duration
83+
ExpectContinueTimeout time.Duration
84+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package service
2+
3+
import (
4+
"net"
5+
"net/http"
6+
"os"
7+
)
8+
9+
const (
10+
fieldCustomTransportEnabled = "enabled"
11+
fieldDialContext = "dial_context"
12+
fieldDialContextTimeout = "timeout"
13+
fieldDialContextKeepAlive = "keep_alive"
14+
fieldForceAttemptHTTP2 = "force_http2"
15+
fieldMaxIdleConns = "max_idle_connections"
16+
fieldIdleConnTimeout = "idle_connection_timeout"
17+
fieldTLSHandshakeTimeout = "tls_handshake_timeout"
18+
fieldExpectContinueTimeout = "expect_continue_timeout"
19+
)
20+
21+
var overrideDefaultHTTPTransport bool = os.Getenv("BENTO_OVERRIDE_DEFAULT_HTTP_TRANSPORT") == "true"
22+
23+
// FieldHTTPTransport constructs an *http.Transport based on configuration fields found at the given path.
24+
// It returns the transport, a boolean indicating if a custom transport is enabled, and an error if any configuration is invalid.
25+
// The default transport can be overridden by setting the BENTO_OVERRIDE_DEFAULT_HTTP_TRANSPORT environment variable to "true".
26+
func (pConf *ParsedConfig) FieldHTTPTransport(path ...string) (transport *http.Transport, customTransportEnabled bool, err error) {
27+
tranPConf := pConf.Namespace(path...)
28+
29+
customTransportEnabled, err = tranPConf.FieldBool(fieldCustomTransportEnabled)
30+
if err != nil {
31+
return
32+
}
33+
if !customTransportEnabled && !overrideDefaultHTTPTransport {
34+
return nil, false, nil
35+
}
36+
37+
dialContextTimeout, err := tranPConf.FieldDuration([]string{fieldDialContext, fieldDialContextTimeout}...)
38+
if err != nil {
39+
return
40+
}
41+
42+
dialContextKeepAlive, err := tranPConf.FieldDuration([]string{fieldDialContext, fieldDialContextKeepAlive}...)
43+
if err != nil {
44+
return
45+
}
46+
47+
forceAttemptHTTP2, err := tranPConf.FieldBool(fieldForceAttemptHTTP2)
48+
if err != nil {
49+
return
50+
}
51+
52+
maxIdleConns, err := tranPConf.FieldInt(fieldMaxIdleConns)
53+
if err != nil {
54+
return
55+
}
56+
57+
idleConnTimeout, err := tranPConf.FieldDuration(fieldIdleConnTimeout)
58+
if err != nil {
59+
return
60+
}
61+
62+
tlsHandshakeTimeout, err := tranPConf.FieldDuration(fieldTLSHandshakeTimeout)
63+
if err != nil {
64+
return
65+
}
66+
67+
expectContinueTimeout, err := tranPConf.FieldDuration(fieldExpectContinueTimeout)
68+
if err != nil {
69+
return
70+
}
71+
72+
transport = &http.Transport{
73+
Proxy: http.ProxyFromEnvironment,
74+
DialContext: (&net.Dialer{
75+
Timeout: dialContextTimeout,
76+
KeepAlive: dialContextKeepAlive,
77+
}).DialContext,
78+
ForceAttemptHTTP2: forceAttemptHTTP2,
79+
MaxIdleConns: maxIdleConns,
80+
IdleConnTimeout: idleConnTimeout,
81+
TLSHandshakeTimeout: tlsHandshakeTimeout,
82+
ExpectContinueTimeout: expectContinueTimeout,
83+
}
84+
85+
return transport, true, nil
86+
}

website/docs/components/inputs/http_client.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,16 @@ input:
102102
drop_on: []
103103
successful_on: []
104104
proxy_url: "" # No default (optional)
105+
custom_transport:
106+
enabled: false
107+
dial_context:
108+
timeout: 30s
109+
keep_alive: 30s
110+
force_http2: true
111+
max_idle_connections: 100
112+
idle_connection_timeout: 90s
113+
tls_handshake_timeout: 10s
114+
expect_continue_timeout: 1s
105115
payload: "" # No default (optional)
106116
drop_empty_bodies: true
107117
stream:
@@ -724,6 +734,93 @@ An optional HTTP proxy URL.
724734

725735
Type: `string`
726736

737+
### `custom_transport`
738+
739+
Custom transport options.
740+
741+
742+
Type: `object`
743+
744+
### `custom_transport.enabled`
745+
746+
Enables a custom HTTP transport. When set to `false` (default), Bento will use Go's [DefaultTransport](https://pkg.go.dev/net/http#DefaultTransport) for the underlying net/http transport. When enabled, settings from the `custom_transport` fields will be applied to the underlying transport. Note that other fields that modify the transport, such as TLS & ProxyURL, will always apply to the transport. The Env Var 'BENTO_OVERRIDE_DEFAULT_HTTP_TRANSPORT' can also be used to ensure the DefaultTransport isn't used.
747+
748+
749+
Type: `bool`
750+
Default: `false`
751+
Requires version 1.13.0 or newer
752+
753+
### `custom_transport.dial_context`
754+
755+
Settings for the dialer used to create new connections.
756+
757+
758+
Type: `object`
759+
Requires version 1.13.0 or newer
760+
761+
### `custom_transport.dial_context.timeout`
762+
763+
Timeout for establishing new network connections.
764+
765+
766+
Type: `string`
767+
Default: `"30s"`
768+
Requires version 1.13.0 or newer
769+
770+
### `custom_transport.dial_context.keep_alive`
771+
772+
Keep-alive period for an active network connections used by the dialer.
773+
774+
775+
Type: `string`
776+
Default: `"30s"`
777+
Requires version 1.13.0 or newer
778+
779+
### `custom_transport.force_http2`
780+
781+
If true, the transport will attempt to use HTTP/2.
782+
783+
784+
Type: `bool`
785+
Default: `true`
786+
Requires version 1.13.0 or newer
787+
788+
### `custom_transport.max_idle_connections`
789+
790+
Controls the maximum number of idle (keep-alive) connections across all hosts. Zero means no limit.
791+
792+
793+
Type: `int`
794+
Default: `100`
795+
Requires version 1.13.0 or newer
796+
797+
### `custom_transport.idle_connection_timeout`
798+
799+
Maximum amount of time an idle (keep-alive) connection will remain idle before closing itself.
800+
801+
802+
Type: `string`
803+
Default: `"90s"`
804+
Requires version 1.13.0 or newer
805+
806+
### `custom_transport.tls_handshake_timeout`
807+
808+
Maximum time allowed for the TLS handshake to complete when establishing connections.
809+
810+
811+
Type: `string`
812+
Default: `"10s"`
813+
Requires version 1.13.0 or newer
814+
815+
### `custom_transport.expect_continue_timeout`
816+
817+
Time to wait for a server's first response headers after fully writing the request headers if the request has an 'Expect: 100-continue' header. Zero means no timeout and causes the body to be sent immediately, without waiting for the server to approve.
818+
819+
820+
Type: `string`
821+
Default: `"1s"`
822+
Requires version 1.13.0 or newer
823+
727824
### `payload`
728825

729826
An optional payload to deliver for each request.

0 commit comments

Comments
 (0)