Skip to content

Commit

Permalink
Merge pull request #346 from cashapp/mtocker-add-interpolate-params
Browse files Browse the repository at this point in the history
Add support for interpolate params
  • Loading branch information
morgo authored Sep 4, 2024
2 parents 5a7c164 + 0a7c072 commit fd5b349
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 6 deletions.
2 changes: 2 additions & 0 deletions pkg/dbconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3036,6 +3036,8 @@ func newDSN(dsn string, config *DBConfig) (string, error) {
// This behaviour has been observed during blue/green upgrades and failover on AWS Aurora.
// See also: https://github.com/go-sql-driver/mysql?tab=readme-ov-file#rejectreadonly
ops = append(ops, fmt.Sprintf("%s=%s", "rejectReadOnly", "true"))
// Set interpolateParams
ops = append(ops, fmt.Sprintf("%s=%t", "interpolateParams", config.InterpolateParams))
dsn = fmt.Sprintf("%s?%s", dsn, strings.Join(ops, "&"))
return dsn, nil
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/dbconn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,32 @@ func TestNewDSN(t *testing.T) {
dsn := "root:password@tcp(127.0.0.1:3306)/test"
resp, err := newDSN(dsn, NewDBConfig())
assert.NoError(t, err)
assert.Equal(t, "root:password@tcp(127.0.0.1:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp)
assert.Equal(t, "root:password@tcp(127.0.0.1:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp)

// With interpolate on.
config := NewDBConfig()
config.InterpolateParams = true
resp, err = newDSN(dsn, config)
assert.NoError(t, err)
assert.Equal(t, "root:password@tcp(127.0.0.1:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=true", resp)

// Also without TLS options
dsn = "root:password@tcp(mydbhost.internal:3306)/test"
resp, err = newDSN(dsn, NewDBConfig())
assert.NoError(t, err)
assert.Equal(t, "root:password@tcp(mydbhost.internal:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp) // unchanged
assert.Equal(t, "root:password@tcp(mydbhost.internal:3306)/test?sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp) // unchanged

// However, if it is RDS - it will be changed.
dsn = "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com)/test"
resp, err = newDSN(dsn, NewDBConfig())
assert.NoError(t, err)
assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp)
assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp)

// This is with optional port too
dsn = "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com:12345)/test"
resp, err = newDSN(dsn, NewDBConfig())
assert.NoError(t, err)
assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com:12345)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true", resp)
assert.Equal(t, "root:password@tcp(tern-001.cluster-ro-ckxxxxxxvm.us-west-2.rds.amazonaws.com:12345)/test?tls=rds&sql_mode=%22%22&time_zone=%22%2B00%3A00%22&innodb_lock_wait_timeout=3&lock_wait_timeout=30&range_optimizer_max_mem_size=0&transaction_isolation=%22read-committed%22&charset=binary&collation=binary&rejectReadOnly=true&interpolateParams=false", resp)

// Invalid DSN, can't parse.
dsn = "invalid"
Expand Down
6 changes: 4 additions & 2 deletions pkg/dbconn/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ type DBConfig struct {
MaxRetries int
MaxOpenConnections int
RangeOptimizerMaxMemSize int64
InterpolateParams bool
}

func NewDBConfig() *DBConfig {
return &DBConfig{
LockWaitTimeout: 30,
InnodbLockWaitTimeout: 3,
MaxRetries: 5,
MaxOpenConnections: 32, // default is high for historical tests. It's overwritten by the user threads count + 2 for headroom.
RangeOptimizerMaxMemSize: 0, // default is 8M, we set to unlimited. Not user configurable (may reconsider in the future).
MaxOpenConnections: 32, // default is high for historical tests. It's overwritten by the user threads count + 2 for headroom.
RangeOptimizerMaxMemSize: 0, // default is 8M, we set to unlimited. Not user configurable (may reconsider in the future).
InterpolateParams: false, // default is false
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Migration struct {
SkipDropAfterCutover bool `name:"skip-drop-after-cutover" help:"Keep old table after completing cutover" optional:"" default:"false"`
DeferCutOver bool `name:"defer-cutover" help:"Defer cutover (and checksum) until sentinel table is dropped" optional:"" default:"false"`
Strict bool `name:"strict" help:"Exit on --alter mismatch when incomplete migration is detected" optional:"" default:"false"`
InterpolateParams bool `name:"interpolate-params" help:"Enable interpolate params for DSN" optional:"" default:"false" hidden:""`
}

func (m *Migration) Run() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (r *Runner) Run(originalCtx context.Context) error {
var err error
r.dbConfig = dbconn.NewDBConfig()
r.dbConfig.LockWaitTimeout = int(r.migration.LockWaitTimeout.Seconds())
r.dbConfig.InterpolateParams = r.migration.InterpolateParams
// The copier and checker will use Threads to limit N tasks concurrently,
// but we also set it at the DB pool level with +1. Because the copier and
// the replication applier use the same pool, it allows for some natural throttling
Expand Down

0 comments on commit fd5b349

Please sign in to comment.