Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: enable multi pool client connections #577

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func getAPIConfig() *API {
context.Background(),
network.Server{
Logger: logger,
Proxy: defaultProxy,
Proxies: []network.IProxy{defaultProxy},
PluginRegistry: pluginReg,
PluginTimeout: config.DefaultPluginTimeout,
Network: "tcp",
Expand Down
2 changes: 1 addition & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestGetServers(t *testing.T) {
Options: network.Option{
EnableTicker: false,
},
Proxy: proxy,
Proxies: []network.IProxy{proxy},
Logger: zerolog.Logger{},
PluginRegistry: pluginRegistry,
PluginTimeout: config.DefaultPluginTimeout,
Expand Down
2 changes: 1 addition & 1 deletion api/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Test_Healthchecker(t *testing.T) {
Options: network.Option{
EnableTicker: false,
},
Proxy: proxy,
Proxies: []network.IProxy{proxy},
Logger: zerolog.Logger{},
PluginRegistry: pluginRegistry,
PluginTimeout: config.DefaultPluginTimeout,
Expand Down
29 changes: 21 additions & 8 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,18 @@ var runCmd = &cobra.Command{
// Create and initialize servers.
for name, cfg := range conf.Global.Servers {
logger := loggers[name]

var serverProxies []network.IProxy
for _, proxyName := range cfg.Proxies {
proxy, exists := proxies[proxyName]
if !exists {
// This may occur if a proxy referenced in the server configuration does not exist.
logger.Error().Str("proxyName", proxyName).Msg("failed to find proxy configuration")
return
}
serverProxies = append(serverProxies, proxy)
}

servers[name] = network.NewServer(
runCtx,
network.Server{
Expand All @@ -885,14 +897,15 @@ var runCmd = &cobra.Command{
// Can be used to send keepalive messages to the client.
EnableTicker: cfg.EnableTicker,
},
Proxy: proxies[name],
Logger: logger,
PluginRegistry: pluginRegistry,
PluginTimeout: conf.Plugin.Timeout,
EnableTLS: cfg.EnableTLS,
CertFile: cfg.CertFile,
KeyFile: cfg.KeyFile,
HandshakeTimeout: cfg.HandshakeTimeout,
Proxies: serverProxies,
Logger: logger,
PluginRegistry: pluginRegistry,
PluginTimeout: conf.Plugin.Timeout,
EnableTLS: cfg.EnableTLS,
CertFile: cfg.CertFile,
KeyFile: cfg.KeyFile,
HandshakeTimeout: cfg.HandshakeTimeout,
LoadbalancerStrategyName: cfg.LoadBalancer.Strategy,
},
)

Expand Down
62 changes: 49 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func (c *Config) LoadDefaults(ctx context.Context) *gerr.GatewayDError {
CertFile: "",
KeyFile: "",
HandshakeTimeout: DefaultHandshakeTimeout,
Proxies: []string{Default},
LoadBalancer: LoadBalancer{Strategy: DefaultLoadBalancerStrategy},
}

c.globalDefaults = GlobalConfig{
Expand Down Expand Up @@ -413,7 +415,7 @@ func (c *Config) ValidateGlobalConfig(ctx context.Context) *gerr.GatewayDError {
}

var errors []*gerr.GatewayDError
configObjects := []string{"loggers", "metrics", "clients", "pools", "proxies", "servers"}
configObjects := []string{"loggers", "metrics", "servers"}
sort.Strings(configObjects)
var seenConfigObjects []string

Expand Down Expand Up @@ -441,18 +443,16 @@ func (c *Config) ValidateGlobalConfig(ctx context.Context) *gerr.GatewayDError {
seenConfigObjects = append(seenConfigObjects, "metrics")
}

clientConfigGroups := make(map[string]bool)
for configGroup := range globalConfig.Clients {
clientConfigGroups[configGroup] = true
if globalConfig.Clients[configGroup] == nil {
err := fmt.Errorf("\"clients.%s\" is nil or empty", configGroup)
span.RecordError(err)
errors = append(errors, gerr.ErrValidationFailed.Wrap(err))
}
}

if len(globalConfig.Clients) > 1 {
seenConfigObjects = append(seenConfigObjects, "clients")
}

for configGroup := range globalConfig.Pools {
if globalConfig.Pools[configGroup] == nil {
err := fmt.Errorf("\"pools.%s\" is nil or empty", configGroup)
Expand All @@ -461,10 +461,6 @@ func (c *Config) ValidateGlobalConfig(ctx context.Context) *gerr.GatewayDError {
}
}

if len(globalConfig.Pools) > 1 {
seenConfigObjects = append(seenConfigObjects, "pools")
}

for configGroup := range globalConfig.Proxies {
if globalConfig.Proxies[configGroup] == nil {
err := fmt.Errorf("\"proxies.%s\" is nil or empty", configGroup)
Expand All @@ -473,10 +469,6 @@ func (c *Config) ValidateGlobalConfig(ctx context.Context) *gerr.GatewayDError {
}
}

if len(globalConfig.Proxies) > 1 {
seenConfigObjects = append(seenConfigObjects, "proxies")
}

for configGroup := range globalConfig.Servers {
if globalConfig.Servers[configGroup] == nil {
err := fmt.Errorf("\"servers.%s\" is nil or empty", configGroup)
Expand All @@ -489,6 +481,50 @@ func (c *Config) ValidateGlobalConfig(ctx context.Context) *gerr.GatewayDError {
seenConfigObjects = append(seenConfigObjects, "servers")
}

// ValidateClientsPoolsProxies checks if all configGroups in globalConfig.Pools and globalConfig.Proxies
// are referenced in globalConfig.Clients.
if len(globalConfig.Clients) != len(globalConfig.Pools) || len(globalConfig.Clients) != len(globalConfig.Proxies) {
err := goerrors.New("clients, pools, and proxies do not have the same number of objects")
span.RecordError(err)
errors = append(errors, gerr.ErrValidationFailed.Wrap(err))
}

// Check if all proxies are referenced in client configuration
for configGroup := range globalConfig.Proxies {
if !clientConfigGroups[configGroup] {
err := fmt.Errorf(`"proxies.%s" not referenced in client configuration`, configGroup)
span.RecordError(err)
errors = append(errors, gerr.ErrValidationFailed.Wrap(err))
}
}

// Check if all pools are referenced in client configuration
for configGroup := range globalConfig.Pools {
if !clientConfigGroups[configGroup] {
err := fmt.Errorf(`"pools.%s" not referenced in client configuration`, configGroup)
span.RecordError(err)
errors = append(errors, gerr.ErrValidationFailed.Wrap(err))
}
}

// Each server configuration should have at least one proxy defined.
// Each proxy in the server configuration should be referenced in proxies configuration.
for serverName, server := range globalConfig.Servers {
if len(server.Proxies) == 0 {
err := fmt.Errorf(`"servers.%s" has no proxies defined`, serverName)
span.RecordError(err)
errors = append(errors, gerr.ErrValidationFailed.Wrap(err))
continue
}
for _, proxyName := range server.Proxies {
if _, exists := c.globalDefaults.Proxies[proxyName]; !exists {
err := fmt.Errorf(`"servers.%s" references a non-existent proxy "%s"`, serverName, proxyName)
span.RecordError(err)
errors = append(errors, gerr.ErrValidationFailed.Wrap(err))
}
}
}

sort.Strings(seenConfigObjects)

if len(seenConfigObjects) > 0 && !reflect.DeepEqual(configObjects, seenConfigObjects) {
Expand Down
14 changes: 10 additions & 4 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ const (
DefaultHealthCheckPeriod = 60 * time.Second // This must match PostgreSQL authentication timeout.

// Server constants.
DefaultListenNetwork = "tcp"
DefaultListenAddress = "0.0.0.0:15432"
DefaultTickInterval = 5 * time.Second
DefaultHandshakeTimeout = 5 * time.Second
DefaultListenNetwork = "tcp"
DefaultListenAddress = "0.0.0.0:15432"
DefaultTickInterval = 5 * time.Second
DefaultHandshakeTimeout = 5 * time.Second
DefaultLoadBalancerStrategy = "ROUND_ROBIN"

// Utility constants.
DefaultSeed = 1000
Expand Down Expand Up @@ -124,3 +125,8 @@ const (
DefaultRedisAddress = "localhost:6379"
DefaultRedisChannel = "gatewayd-actions"
)

// Load balancing strategies.
const (
RoundRobinStrategy = "ROUND_ROBIN"
)
6 changes: 6 additions & 0 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type Proxy struct {
HealthCheckPeriod time.Duration `json:"healthCheckPeriod" jsonschema:"oneof_type=string;integer"`
}

type LoadBalancer struct {
Strategy string `json:"strategy"`
}

type Server struct {
EnableTicker bool `json:"enableTicker"`
TickInterval time.Duration `json:"tickInterval" jsonschema:"oneof_type=string;integer"`
Expand All @@ -105,6 +109,8 @@ type Server struct {
CertFile string `json:"certFile"`
KeyFile string `json:"keyFile"`
HandshakeTimeout time.Duration `json:"handshakeTimeout" jsonschema:"oneof_type=string;integer"`
Proxies []string `json:"proxies"`
LoadBalancer LoadBalancer `json:"loadBalancer"`
}

type API struct {
Expand Down
10 changes: 10 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
ErrCodeMsgEncodeError
ErrCodeConfigParseError
ErrCodePublishAsyncAction
ErrCodeLoadBalancerStrategyNotFound
ErrCodeNoProxiesAvailable
)

var (
Expand Down Expand Up @@ -194,6 +196,14 @@ var (
ErrCodePublishAsyncAction, "error publishing async action", nil,
}

ErrLoadBalancerStrategyNotFound = &GatewayDError{
ErrCodeLoadBalancerStrategyNotFound, "The specified load balancer strategy does not exist.", nil,
}

ErrNoProxiesAvailable = &GatewayDError{
ErrCodeNoProxiesAvailable, "No proxies available to select.", nil,
}

// Unwrapped errors.
ErrLoggerRequired = errors.New("terminate action requires a logger parameter")
)
Expand Down
24 changes: 24 additions & 0 deletions gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,43 @@ clients:
backoff: 1s # duration
backoffMultiplier: 2.0 # 0 means no backoff
disableBackoffCaps: false
default-2:
network: tcp
address: localhost:5433
tcpKeepAlive: False
tcpKeepAlivePeriod: 30s # duration
receiveChunkSize: 8192
receiveDeadline: 0s # duration, 0ms/0s means no deadline
receiveTimeout: 0s # duration, 0ms/0s means no timeout
sendDeadline: 0s # duration, 0ms/0s means no deadline
dialTimeout: 60s # duration
# Retry configuration
retries: 3 # 0 means no retry and fail immediately on the first attempt
backoff: 1s # duration
backoffMultiplier: 2.0 # 0 means no backoff
disableBackoffCaps: false

pools:
default:
size: 10
default-2:
size: 10

proxies:
default:
healthCheckPeriod: 60s # duration
default-2:
healthCheckPeriod: 60s # duration

servers:
default:
network: tcp
address: 0.0.0.0:15432
proxies:
- "default"
- "default-2"
loadBalancer:
strategy: ROUND_ROBIN
mostafa marked this conversation as resolved.
Show resolved Hide resolved
enableTicker: False
tickInterval: 5s # duration
enableTLS: False
Expand Down
19 changes: 19 additions & 0 deletions network/loadbalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package network

import (
"github.com/gatewayd-io/gatewayd/config"
gerr "github.com/gatewayd-io/gatewayd/errors"
)

type LoadBalancerStrategy interface {
NextProxy() (IProxy, *gerr.GatewayDError)
}

func NewLoadBalancerStrategy(server *Server) (LoadBalancerStrategy, *gerr.GatewayDError) {
switch server.LoadbalancerStrategyName {
case config.RoundRobinStrategy:
return NewRoundRobin(server), nil
default:
return nil, gerr.ErrLoadBalancerStrategyNotFound
}
}
44 changes: 44 additions & 0 deletions network/loadbalancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package network

import (
"errors"
"testing"

"github.com/gatewayd-io/gatewayd/config"
gerr "github.com/gatewayd-io/gatewayd/errors"
)

// TestNewLoadBalancerStrategy tests the NewLoadBalancerStrategy function to ensure it correctly
// initializes the load balancer strategy based on the strategy name provided in the server configuration.
// It covers both valid and invalid strategy names.
func TestNewLoadBalancerStrategy(t *testing.T) {
serverValid := &Server{
LoadbalancerStrategyName: config.RoundRobinStrategy,
Proxies: []IProxy{MockProxy{}},
}

// Test case 1: Valid strategy name
strategy, err := NewLoadBalancerStrategy(serverValid)
if err != nil {
t.Errorf("Expected no error, got %v", err)
}

_, ok := strategy.(*RoundRobin)
if !ok {
t.Errorf("Expected strategy to be of type RoundRobin")
}

// Test case 2: InValid strategy name
serverInvalid := &Server{
LoadbalancerStrategyName: "InvalidStrategy",
Proxies: []IProxy{MockProxy{}},
}

strategy, err = NewLoadBalancerStrategy(serverInvalid)
if !errors.Is(err, gerr.ErrLoadBalancerStrategyNotFound) {
t.Errorf("Expected ErrLoadBalancerStrategyNotFound, got %v", err)
}
if strategy != nil {
t.Errorf("Expected strategy to be nil for invalid strategy name")
}
}
Loading