Skip to content

Commit

Permalink
NEOS-1641: Fixes stale DB Connections during Data Sync Workflow (#2995)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Dec 2, 2024
1 parent 4cab8af commit ca71920
Show file tree
Hide file tree
Showing 84 changed files with 1,888 additions and 2,267 deletions.
14 changes: 7 additions & 7 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ issues:
- keycloak
# # Excluding configuration per-path, per-linter, per-text and per-source
exclude-rules:
- path: _test\.go
linters:
- gomnd
- lll
- goconst
- gosec
- path: ^mock_.*\.go$
text: "exclude mock files"
linters:
- gocritic
- gofmt
- goimports
- gosimple
- stylecheck
- whitespace

run:
timeout: 10m
Expand Down
2 changes: 1 addition & 1 deletion .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ packages:
interfaces:
ManagerInterface:
ClientInterface:
github.com/nucleuscloud/neosync/internal/connection-tunnel-manager:
github.com/nucleuscloud/neosync/internal/connection-manager:
interfaces:
ConnectionProvider:
github.com/nucleuscloud/neosync/worker/pkg/benthos/dynamodb:
Expand Down
24 changes: 17 additions & 7 deletions backend/internal/cmds/mgmt/serve/connect/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"

"connectrpc.com/connect"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/auth0/go-jwt-middleware/v2/validator"
"github.com/go-logr/logr"
"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
"github.com/nucleuscloud/neosync/internal/connectrpc/validate"
http_client "github.com/nucleuscloud/neosync/internal/http/client"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
Expand Down Expand Up @@ -464,14 +464,24 @@ func serve(ctx context.Context) error {
pgquerier := pg_queries.New()
mysqlquerier := mysql_queries.New()
sqlConnector := &sqlconnect.SqlOpenConnector{}
pgpoolmap := &sync.Map{}
mysqlpoolmap := &sync.Map{}
mssqlpoolmap := &sync.Map{}
mssqlquerier := mssql_queries.New()
sqlmanager := sql_manager.NewSqlManager(pgpoolmap, pgquerier, mysqlpoolmap, mysqlquerier, mssqlpoolmap, mssqlquerier, sqlConnector)

sqlmanager := sql_manager.NewSqlManager(
sql_manager.WithPostgresQuerier(pgquerier),
sql_manager.WithMysqlQuerier(mysqlquerier),
sql_manager.WithMssqlQuerier(mssqlquerier),
sql_manager.WithConnectionManagerOpts(connectionmanager.WithCloseOnRelease()),
)
mongoconnector := mongoconnect.NewConnector()
connectionService := v1alpha1_connectionservice.New(&v1alpha1_connectionservice.Config{}, db, useraccountService, sqlConnector, pgquerier,
mysqlquerier, mssqlquerier, mongoconnector, awsManager)
connectionService := v1alpha1_connectionservice.New(
&v1alpha1_connectionservice.Config{},
db,
useraccountService,
mongoconnector,
awsManager,
sqlmanager,
&sqlconnect.SqlOpenConnector{},
)
api.Handle(
mgmtv1alpha1connect.NewConnectionServiceHandler(
connectionService,
Expand Down
3 changes: 2 additions & 1 deletion backend/pkg/dbconnect-config/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/pkg/clienttls"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
)

type pgConnectConfig struct {
Expand Down Expand Up @@ -37,7 +38,7 @@ func NewFromPostgresConnection(
}

pgurl := url.URL{
Scheme: "postgres",
Scheme: sqlmanager_shared.DefaultPostgresDriver,
Host: host,
}
if cc.Connection.GetUser() != "" && cc.Connection.GetPass() != "" {
Expand Down
49 changes: 13 additions & 36 deletions backend/pkg/integration-test/integration-test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"net/http"
"net/http/httptest"
"sync"
"testing"

"connectrpc.com/connect"
Expand All @@ -26,7 +25,6 @@ import (
clientmanager "github.com/nucleuscloud/neosync/backend/internal/temporal/clientmanager"
"github.com/nucleuscloud/neosync/backend/internal/utils"
"github.com/nucleuscloud/neosync/backend/pkg/mongoconnect"
mssql_queries "github.com/nucleuscloud/neosync/backend/pkg/mssql-querier"
"github.com/nucleuscloud/neosync/backend/pkg/sqlconnect"
"github.com/nucleuscloud/neosync/backend/pkg/sqlmanager"
v1alpha_anonymizationservice "github.com/nucleuscloud/neosync/backend/services/mgmt/v1alpha1/anonymization-service"
Expand All @@ -37,6 +35,7 @@ import (
v1alpha1_useraccountservice "github.com/nucleuscloud/neosync/backend/services/mgmt/v1alpha1/user-account-service"
awsmanager "github.com/nucleuscloud/neosync/internal/aws"
"github.com/nucleuscloud/neosync/internal/billing"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio"
http_client "github.com/nucleuscloud/neosync/internal/http/client"
neomigrate "github.com/nucleuscloud/neosync/internal/migrate"
Expand Down Expand Up @@ -188,16 +187,16 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
nil,
)

sqlmanagerclient := NewTestSqlManagerClient()

authdConnectionService := v1alpha1_connectionservice.New(
&v1alpha1_connectionservice.Config{},
neosyncdb.New(pgcontainer.DB, db_queries.New()),
authdUserService,
&sqlconnect.SqlOpenConnector{},
pg_queries.New(),
mysql_queries.New(),
mssql_queries.New(),
mongoconnect.NewConnector(),
awsmanager.New(),
sqlmanagerclient,
&sqlconnect.SqlOpenConnector{},
)

neoCloudAuthdUserService := v1alpha1_useraccountservice.New(
Expand All @@ -221,12 +220,10 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
&v1alpha1_connectionservice.Config{},
neosyncdb.New(pgcontainer.DB, db_queries.New()),
neoCloudAuthdUserService,
&sqlconnect.SqlOpenConnector{},
pg_queries.New(),
mysql_queries.New(),
mssql_queries.New(),
mongoconnect.NewConnector(),
awsmanager.New(),
sqlmanagerclient,
&sqlconnect.SqlOpenConnector{},
)
neoCloudJobHookService := jobhooks.New(
neosyncdb.New(pgcontainer.DB, db_queries.New()),
Expand All @@ -239,12 +236,7 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
s.Mocks.TemporalClientManager,
neoCloudConnectionService,
neoCloudAuthdUserService,
sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
),
sqlmanagerclient,
neoCloudJobHookService,
)

Expand All @@ -262,12 +254,10 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
&v1alpha1_connectionservice.Config{},
neosyncdb.New(pgcontainer.DB, db_queries.New()),
unauthdUserService,
&sqlconnect.SqlOpenConnector{},
pg_queries.New(),
mysql_queries.New(),
mssql_queries.New(),
mongoconnect.NewConnector(),
awsmanager.New(),
sqlmanagerclient,
&sqlconnect.SqlOpenConnector{},
)

jobhookService := jobhooks.New(
Expand All @@ -281,12 +271,7 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
s.Mocks.TemporalClientManager,
unauthdConnectionsService,
unauthdUserService,
sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
),
sqlmanagerclient,
jobhookService,
)

Expand All @@ -300,12 +285,7 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
pg_queries.New(),
mysql_queries.New(),
mongoconnect.NewConnector(),
sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
),
sqlmanagerclient,
neosync_gcp.NewManager(),
)

Expand Down Expand Up @@ -478,9 +458,6 @@ func startHTTPServer(tb testing.TB, h http.Handler) *httptest.Server {

func NewTestSqlManagerClient() *sqlmanager.SqlManager {
return sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
sqlmanager.WithConnectionManagerOpts(connectionmanager.WithCloseOnRelease()),
)
}
33 changes: 16 additions & 17 deletions backend/pkg/sqlconnect/mock_SqlConnector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 21 additions & 8 deletions backend/pkg/sqlconnect/sql-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/pkg/clienttls"
dbconnectconfig "github.com/nucleuscloud/neosync/backend/pkg/dbconnect-config"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
tun "github.com/nucleuscloud/neosync/internal/sshtunnel"
"github.com/nucleuscloud/neosync/internal/sshtunnel/connectors/mssqltunconnector"
"github.com/nucleuscloud/neosync/internal/sshtunnel/connectors/mysqltunconnector"
Expand All @@ -39,6 +40,8 @@ type SqlConnectorOption func(*sqlConnectorOptions)
type sqlConnectorOptions struct {
mysqlDisableParseTime bool
postgresDriver string

connectionTimeoutSeconds *uint32
}

// WithMysqlParseTimeDisabled disables MySQL time parsing
Expand All @@ -51,26 +54,36 @@ func WithMysqlParseTimeDisabled() SqlConnectorOption {
// WithPostgresDriver overrides default postgres driver
func WithDefaultPostgresDriver() SqlConnectorOption {
return func(opts *sqlConnectorOptions) {
opts.postgresDriver = "postgres"
opts.postgresDriver = sqlmanager_shared.DefaultPostgresDriver
}
}

// Provide an integer number that corresponds to the number of seconds to wait before timing out attempting to connect.
// Ex: 10 == 10 seconds
func WithConnectionTimeout(timeoutSeconds uint32) SqlConnectorOption {
return func(sco *sqlConnectorOptions) {
sco.connectionTimeoutSeconds = &timeoutSeconds
}
}

type SqlConnector interface {
NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error)
NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error)
}

type SqlOpenConnector struct{}

func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error) {
func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.ConnectionConfig, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error) {
if cc == nil {
return nil, errors.New("connectionConfig was nil, expected *mgmtv1alpha1.ConnectionConfig")
}

options := sqlConnectorOptions{
postgresDriver: "pgx",
postgresDriver: sqlmanager_shared.PostgresDriver,
}
for _, opt := range opts {
opt(&options)
if opt != nil {
opt(&options)
}
}

dbconnopts, err := getConnectionOptsFromConnectionConfig(cc)
Expand All @@ -86,7 +99,7 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio
return nil, fmt.Errorf("unable to upsert client tls files: %w", err)
}
}
connDetails, err := dbconnectconfig.NewFromPostgresConnection(config, connectionTimeout, logger)
connDetails, err := dbconnectconfig.NewFromPostgresConnection(config, options.connectionTimeoutSeconds, logger)
if err != nil {
return nil, err
}
Expand All @@ -107,7 +120,7 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio
return newStdlibContainer(options.postgresDriver, dsn, dbconnopts), nil
}
case *mgmtv1alpha1.ConnectionConfig_MysqlConfig:
connDetails, err := dbconnectconfig.NewFromMysqlConnection(config, connectionTimeout, logger, options.mysqlDisableParseTime)
connDetails, err := dbconnectconfig.NewFromMysqlConnection(config, options.connectionTimeoutSeconds, logger, options.mysqlDisableParseTime)
if err != nil {
return nil, err
}
Expand All @@ -127,7 +140,7 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio
}
return newStdlibContainer("mysql", dsn, dbconnopts), nil
case *mgmtv1alpha1.ConnectionConfig_MssqlConfig:
connDetails, err := dbconnectconfig.NewFromMssqlConnection(config, connectionTimeout)
connDetails, err := dbconnectconfig.NewFromMssqlConnection(config, options.connectionTimeoutSeconds)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit ca71920

Please sign in to comment.