From 0a7c0721ab10935a8b7a51988d79ee5cb151c7c6 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 4 Sep 2024 16:37:24 -0600 Subject: [PATCH] Add support for interpolate params --- pkg/dbconn/conn.go | 2 ++ pkg/dbconn/conn_test.go | 15 +++++++++++---- pkg/dbconn/dbconn.go | 6 ++++-- pkg/migration/migration.go | 1 + pkg/migration/runner.go | 1 + 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/pkg/dbconn/conn.go b/pkg/dbconn/conn.go index dd8dd3a..8ea1ea3 100644 --- a/pkg/dbconn/conn.go +++ b/pkg/dbconn/conn.go @@ -3036,6 +3036,8 @@ func newDSN(dsn string, config *DBConfig) (string, error) { // This behaviour has been observed during blue/green upgrades and failover on AWS Aurora. // See also: https://github.com/go-sql-driver/mysql?tab=readme-ov-file#rejectreadonly ops = append(ops, fmt.Sprintf("%s=%s", "rejectReadOnly", "true")) + // Set interpolateParams + ops = append(ops, fmt.Sprintf("%s=%t", "interpolateParams", config.InterpolateParams)) dsn = fmt.Sprintf("%s?%s", dsn, strings.Join(ops, "&")) return dsn, nil } diff --git a/pkg/dbconn/conn_test.go b/pkg/dbconn/conn_test.go index 9d5b604..976cf35 100644 --- a/pkg/dbconn/conn_test.go +++ b/pkg/dbconn/conn_test.go @@ -13,25 +13,32 @@ func TestNewDSN(t *testing.T) { dsn := "root:password@tcp(127.0.0.1:3306)/test" resp, err := newDSN(dsn, NewDBConfig()) assert.NoError(t, err) - assert.Equal(t, "root:password@tcp(127.0.0.1:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp) + assert.Equal(t, "root:password@tcp(127.0.0.1:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp) + + // With interpolate on. + config := NewDBConfig() + config.InterpolateParams = true + resp, err = newDSN(dsn, config) + assert.NoError(t, err) + assert.Equal(t, "root:password@tcp(127.0.0.1:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=true", resp) // Also without TLS options dsn = "root:password@tcp(mydbhost.internal:3306)/test" resp, err = newDSN(dsn, NewDBConfig()) assert.NoError(t, err) - assert.Equal(t, "root:password@tcp(mydbhost.internal:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp) // unchanged + assert.Equal(t, "root:password@tcp(mydbhost.internal:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp) // unchanged // However, if it is RDS - it will be changed. dsn = "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com)/test" resp, err = newDSN(dsn, NewDBConfig()) assert.NoError(t, err) - assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp) + assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp) // This is with optional port too dsn = "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com:12345)/test" resp, err = newDSN(dsn, NewDBConfig()) assert.NoError(t, err) - assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com:12345)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp) + assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com:12345)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp) // Invalid DSN, can't parse. dsn = "invalid" diff --git a/pkg/dbconn/dbconn.go b/pkg/dbconn/dbconn.go index 1fef7d9..08e8323 100644 --- a/pkg/dbconn/dbconn.go +++ b/pkg/dbconn/dbconn.go @@ -30,6 +30,7 @@ type DBConfig struct { MaxRetries int MaxOpenConnections int RangeOptimizerMaxMemSize int64 + InterpolateParams bool } func NewDBConfig() *DBConfig { @@ -37,8 +38,9 @@ func NewDBConfig() *DBConfig { LockWaitTimeout: 30, InnodbLockWaitTimeout: 3, MaxRetries: 5, - MaxOpenConnections: 32, // default is high for historical tests. It's overwritten by the user threads count + 2 for headroom. - RangeOptimizerMaxMemSize: 0, // default is 8M, we set to unlimited. Not user configurable (may reconsider in the future). + MaxOpenConnections: 32, // default is high for historical tests. It's overwritten by the user threads count + 2 for headroom. + RangeOptimizerMaxMemSize: 0, // default is 8M, we set to unlimited. Not user configurable (may reconsider in the future). + InterpolateParams: false, // default is false } } diff --git a/pkg/migration/migration.go b/pkg/migration/migration.go index fed991b..eb9e143 100644 --- a/pkg/migration/migration.go +++ b/pkg/migration/migration.go @@ -30,6 +30,7 @@ type Migration struct { SkipDropAfterCutover bool `name:"skip-drop-after-cutover" help:"Keep old table after completing cutover" optional:"" default:"false"` DeferCutOver bool `name:"defer-cutover" help:"Defer cutover (and checksum) until sentinel table is dropped" optional:"" default:"false"` Strict bool `name:"strict" help:"Exit on --alter mismatch when incomplete migration is detected" optional:"" default:"false"` + InterpolateParams bool `name:"interpolate-params" help:"Enable interpolate params for DSN" optional:"" default:"false" hidden:""` } func (m *Migration) Run() error { diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index a2aa473..eebaddc 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -175,6 +175,7 @@ func (r *Runner) Run(originalCtx context.Context) error { var err error r.dbConfig = dbconn.NewDBConfig() r.dbConfig.LockWaitTimeout = int(r.migration.LockWaitTimeout.Seconds()) + r.dbConfig.InterpolateParams = r.migration.InterpolateParams // The copier and checker will use Threads to limit N tasks concurrently, // but we also set it at the DB pool level with +1. Because the copier and // the replication applier use the same pool, it allows for some natural throttling