Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: configurable transformer http client in processor, router #5455

Draft
wants to merge 10 commits into
base: chore.configurableCSLB
Choose a base branch
from
142 changes: 142 additions & 0 deletions internal/transformer-client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package transformerclient

import (
"context"
"github.com/bufbuild/httplb/resolver"
"net"
"net/http"
"time"

"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/conn"
"github.com/bufbuild/httplb/health"
"github.com/bufbuild/httplb/picker"

"github.com/rudderlabs/rudder-server/utils/sysUtils"
)

type ClientConfig struct {
TransportConfig struct {
DisableKeepAlives bool // true
MaxConnsPerHost int // 100
MaxIdleConnsPerHost int // 10
IdleConnTimeout time.Duration // 30*time.Second
}

ClientTimeout time.Duration // 600*time.Second
ClientTTL time.Duration // 120*time.Second

ClientType string // stdlib(default), recycled, httplb

PickerType string // power_of_two(default), round_robin, least_loaded_random, least_loaded_round_robin, random

CheckerType string // nop, polling
CheckURL string
}

type Client interface {
Do(req *http.Request) (*http.Response, error)
}

func NewClient(config *ClientConfig) Client {
transport := &http.Transport{
DisableKeepAlives: true,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
}
client := &http.Client{
Transport: transport,
Timeout: 600 * time.Second,
}
if config == nil {
return client
}

Check warning on line 54 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L53-L54

Added lines #L53 - L54 were not covered by tests

if !config.TransportConfig.DisableKeepAlives {
transport.DisableKeepAlives = false
}

Check warning on line 58 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L57-L58

Added lines #L57 - L58 were not covered by tests
if config.TransportConfig.MaxConnsPerHost != 0 {
transport.MaxConnsPerHost = config.TransportConfig.MaxConnsPerHost
}
if config.TransportConfig.MaxIdleConnsPerHost != 0 {
transport.MaxIdleConnsPerHost = config.TransportConfig.MaxIdleConnsPerHost
}
if config.TransportConfig.IdleConnTimeout != 0 {
transport.IdleConnTimeout = config.TransportConfig.IdleConnTimeout
}

if config.ClientTimeout != 0 {
client.Timeout = config.ClientTimeout
}

clientTTL := 120 * time.Second
if config.ClientTTL != 0 {
clientTTL = config.ClientTTL
}

switch config.ClientType {
case "stdlib":
return client
case "recycled":
return sysUtils.NewRecycledHTTPClient(func() *http.Client {
return client
}, clientTTL)
case "httplb":
checkerType := config.CheckerType
if config.CheckURL == "" {
checkerType = "nop"
}
return httplb.NewClient(
httplb.WithRootContext(context.TODO()),
httplb.WithPicker(getPicker(config.PickerType)),
httplb.WithHealthChecks(getChecker(checkerType, config.CheckURL)),
httplb.WithIdleConnectionTimeout(transport.IdleConnTimeout),
httplb.WithRequestTimeout(client.Timeout),
httplb.WithRoundTripperMaxLifetime(transport.IdleConnTimeout),
httplb.WithIdleTransportTimeout(2*transport.IdleConnTimeout),
httplb.WithResolver(resolver.NewDNSResolver(net.DefaultResolver, resolver.PreferIPv4, clientTTL)),
)
default:
return client

Check warning on line 101 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L81-L101

Added lines #L81 - L101 were not covered by tests
}
}

func getPicker(pickerType string) func(prev picker.Picker, allConns conn.Conns) picker.Picker {
switch pickerType {
case "power_of_two":
return picker.NewPowerOfTwo
case "round_robin":
return picker.NewRoundRobin
case "least_loaded_random":
return picker.NewLeastLoadedRandom
case "least_loaded_round_robin":
return picker.NewLeastLoadedRoundRobin
case "random":
return picker.NewRandom
default:
return picker.NewPowerOfTwo

Check warning on line 118 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L105-L118

Added lines #L105 - L118 were not covered by tests
}
}

type HTTPLBTransport struct {
*http.Transport
}

func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}

Check warning on line 127 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L126-L127

Added lines #L126 - L127 were not covered by tests
}

func getChecker(checkerType, url string) health.Checker {
switch checkerType {
case "nop":
return health.NopChecker
case "polling":
return health.NewPollingChecker(
health.PollingCheckerConfig{},
health.NewSimpleProber(url),
)
default:
return health.NopChecker

Check warning on line 140 in internal/transformer-client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/transformer-client/client.go#L130-L140

Added lines #L130 - L140 were not covered by tests
}
}
122 changes: 36 additions & 86 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"runtime/trace"
Expand All @@ -16,11 +15,6 @@
"sync"
"time"

"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/conn"
"github.com/bufbuild/httplb/health"
"github.com/bufbuild/httplb/picker"
"github.com/bufbuild/httplb/resolver"
"github.com/cenkalti/backoff"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
Expand All @@ -30,9 +24,10 @@
"github.com/rudderlabs/rudder-go-kit/stats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
Expand Down Expand Up @@ -240,44 +235,21 @@

trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency)

clientType := conf.GetString("Transformer.Client.type", "stdlib")
transformerClientConfig := &transformerclient.ClientConfig{
ClientTimeout: trans.config.timeoutDuration,
ClientTTL: config.GetDuration("Transformer.Client.ttl", 120, time.Second),
ClientType: conf.GetString("Transformer.Client.type", "stdlib"),
PickerType: conf.GetString("Transformer.Client.httplb.pickerType", "power_of_two"),
CheckerType: conf.GetString("Transformer.Client.httplb.checkerType", "nop"),

transport := &http.Transport{
DisableKeepAlives: trans.config.disableKeepAlives,
MaxConnsPerHost: trans.config.maxHTTPConnections,
MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections,
IdleConnTimeout: trans.config.maxIdleConnDuration,
}
client := &http.Client{
Transport: transport,
Timeout: trans.config.timeoutDuration,
}

switch clientType {
case "stdlib":
trans.httpClient = client
case "recycled":
trans.httpClient = sysUtils.NewRecycledHTTPClient(func() *http.Client {
return client
}, config.GetDuration("Transformer.Client.ttl", 120, time.Second))
case "httplb":
trans.httpClient = httplb.NewClient(
httplb.WithPicker(getPicker(conf)),
httplb.WithTransport("http", &HTTPLBTransport{
Transport: transport,
}),
httplb.WithHealthChecks(getChecker(conf, trans.config.userTransformationURL)),
httplb.WithResolver(
resolver.NewDNSResolver(
net.DefaultResolver,
resolver.PreferIPv6,
config.GetDuration("Transformer.Client.ttl", 120, time.Second), // TTL value
),
),
)
default:
panic(fmt.Sprintf("unknown transformer client type: %s", clientType))
//for now no health checks - can be implemented when there are different clients for UT, DT, TPV
CheckURL: trans.config.userTransformationURL,
}
transformerClientConfig.TransportConfig.DisableKeepAlives = trans.config.disableKeepAlives
transformerClientConfig.TransportConfig.MaxConnsPerHost = trans.config.maxHTTPConnections
transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = trans.config.maxHTTPIdleConnections
transformerClientConfig.TransportConfig.IdleConnTimeout = trans.config.maxIdleConnDuration
trans.httpClient = transformerclient.NewClient(transformerClientConfig)

for _, opt := range opts {
opt(&trans)
Expand Down Expand Up @@ -307,47 +279,6 @@
return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, trackingPlanValidationStage)
}

type HTTPLBTransport struct {
*http.Transport
}

func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}
}

func getPicker(conf *config.Config) func(prev picker.Picker, allConns conn.Conns) picker.Picker {
pickerType := conf.GetString("Transformer.Client.httplb.pickerType", "power_of_two")
switch pickerType {
case "power_of_two":
return picker.NewPowerOfTwo
case "round_robin":
return picker.NewRoundRobin
case "least_loaded_random":
return picker.NewLeastLoadedRandom
case "least_loaded_round_robin":
return picker.NewLeastLoadedRoundRobin
case "random":
return picker.NewRandom
default:
panic(fmt.Sprintf("unknown picker type: %s", pickerType))
}
}

func getChecker(conf *config.Config, url string) health.Checker {
checkerType := conf.GetString("Transformer.Client.httplb.checkerType", "nop")
switch checkerType {
case "nop":
return health.NopChecker
case "polling":
return health.NewPollingChecker(
health.PollingCheckerConfig{},
health.NewSimpleProber(url),
)
default:
panic(fmt.Sprintf("unknown checker type: %s", checkerType))
}
}

func (trans *handle) transform(
ctx context.Context,
clientEvents []TransformerEvent,
Expand Down Expand Up @@ -481,7 +412,7 @@
transformationID = data[0].Destination.Transformations[0].ID
}

respData, statusCode = trans.doPost(ctx, rawJSON, url, stage, stats.Tags{
respData, statusCode = trans.doPost(context.WithValue(ctx, "numEvents", len(data)), rawJSON, url, stage, stats.Tags{

Check failure on line 415 in processor/transformer/transformer.go

View workflow job for this annotation

GitHub Actions / lint

SA1029: should not use built-in type string as key for value; define your own type to avoid collisions (staticcheck)
"destinationType": data[0].Destination.DestinationDefinition.Name,
"destinationId": data[0].Destination.ID,
"sourceId": data[0].Metadata.SourceID,
Expand Down Expand Up @@ -554,6 +485,7 @@
retryCount int
resp *http.Response
respData []byte
numEvents = ctx.Value("numEvents").(int)
)
retryStrategy := backoff.NewExponentialBackOff()
// MaxInterval caps the RetryInterval
Expand All @@ -578,10 +510,28 @@

resp, reqErr = trans.httpClient.Do(req)
})
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime))
duration := time.Since(requestStartTime)
if duration >= time.Second {
trans.logger.Errorw("duration > 1s", time.Now().Format(misc.RFC3339Milli), duration)
}
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(duration)
if reqErr != nil {
return reqErr
}
// TODO: cleanup and adjust for cardinality of X-Instance-ID header
headerResponseTime := resp.Header.Get("X-Response-Time")
instanceWorker := resp.Header.Get("X-Instance-ID")
if instanceWorker != "" {
newTags := lo.Assign(tags)
newTags["instanceWorker"] = instanceWorker
trans.stat.NewTaggedStat("processor_transformer_instance_event_count", stats.CountType, newTags).Count(numEvents)
dur := duration.Milliseconds()
headerTime, err := strconv.ParseFloat(strings.TrimSuffix(headerResponseTime, "ms"), 64)
if err == nil {
diff := float64(dur) - headerTime
trans.stat.NewTaggedStat("processor_tranform_duration_diff_time", stats.TimerType, newTags).SendTiming(time.Duration(diff) * time.Millisecond)

Check failure on line 532 in processor/transformer/transformer.go

View workflow job for this annotation

GitHub Actions / lint

`tranform` is a misspelling of `transform` (misspell)
}
}

defer func() { httputil.CloseResponse(resp) }()

Expand Down
22 changes: 19 additions & 3 deletions router/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
kitsync "github.com/rudderlabs/rudder-go-kit/sync"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/router/types"
oauthv2 "github.com/rudderlabs/rudder-server/services/oauth/v2"
Expand All @@ -50,7 +51,7 @@ const (
type handle struct {
tr *http.Transport
// http client for router transformation request
client *http.Client
client sysUtils.HTTPClientI
// Mockable http.client for transformer proxy request
proxyClient sysUtils.HTTPClientI
// http client timeout for transformer proxy request
Expand Down Expand Up @@ -501,7 +502,21 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c
// Basically this timeout we will configure when we make final call to destination to send event
trans.destinationTimeout = destinationTimeout
// This client is used for Router Transformation
trans.client = &http.Client{Transport: trans.tr, Timeout: trans.transformTimeout}
transformerClientConfig := &transformerclient.ClientConfig{
ClientTimeout: trans.transformTimeout,
ClientTTL: config.GetDuration("Transformer.Client.ttl", 120, time.Second),
ClientType: config.GetString("Transformer.Client.type", "stdlib"),
PickerType: config.GetString("Transformer.Client.httplb.pickerType", "power_of_two"),
CheckerType: config.GetString("Transformer.Client.httplb.checkerType", "nop"),

// for now no health checks - can be implemented when there are different clients for UT, DT, TPV
// CheckURL: trans.config.userTransformationURL,
}
transformerClientConfig.TransportConfig.DisableKeepAlives = config.GetBool("Transformer.Client.disableKeepAlives", true)
transformerClientConfig.TransportConfig.MaxConnsPerHost = config.GetInt("Transformer.Client.maxHTTPConnections", 100)
transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = config.GetInt("Transformer.Client.maxHTTPIdleConnections", 10)
transformerClientConfig.TransportConfig.IdleConnTimeout = 30 * time.Second
trans.client = transformerclient.NewClient(transformerClientConfig)
optionalArgs := &oauthv2httpclient.HttpClientOptionalArgs{
Locker: locker,
Augmenter: extensions.RouterBodyAugmenter,
Expand All @@ -517,7 +532,8 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c
Logger: logger.NewLogger().Child("TransformerProxyHttpClient"),
}
// This client is used for Transformer Proxy(delivered from transformer to destination)
trans.proxyClient = &http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}
transformerClientConfig.ClientTimeout = trans.destinationTimeout + trans.transformTimeout
trans.proxyClient = transformerclient.NewClient(transformerClientConfig)
// This client is used for Transformer Proxy(delivered from transformer to destination) using oauthV2
trans.proxyClientOAuthV2 = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformProxyResponse, proxyClientOptionalArgs)
trans.transformRequestTimerStat = stats.Default.NewStat("router.transformer_request_time", stats.TimerType)
Expand Down
Loading