diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index bcd2477c..9c3f6aa8 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -28,7 +28,7 @@ type Checker struct { concurrency int feed *repl.Client db *sql.DB - trxPool *dbconn.TrxPool + rrConnPool *dbconn.ConnPool isInvalid bool chunker table.Chunker StartTime time.Time @@ -82,13 +82,13 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c return checksum, nil } -func (c *Checker) ChecksumChunk(trxPool *dbconn.TrxPool, chunk *table.Chunk) error { +func (c *Checker) ChecksumChunk(rrConnPool *dbconn.ConnPool, chunk *table.Chunk) error { startTime := time.Now() - trx, err := trxPool.Get() + rrConn, err := rrConnPool.Get() if err != nil { return err } - defer trxPool.Put(trx) + defer rrConnPool.Put(rrConn) c.logger.Debugf("checksumming chunk: %s", chunk.String()) source := fmt.Sprintf("SELECT BIT_XOR(CRC32(CONCAT(%s))) as checksum FROM %s WHERE %s", c.intersectColumns(), @@ -101,11 +101,11 @@ func (c *Checker) ChecksumChunk(trxPool *dbconn.TrxPool, chunk *table.Chunk) err chunk.String(), ) var sourceChecksum, targetChecksum int64 - err = trx.QueryRow(source).Scan(&sourceChecksum) + err = rrConn.QueryRowContext(context.TODO(), source).Scan(&sourceChecksum) if err != nil { return err } - err = trx.QueryRow(target).Scan(&targetChecksum) + err = rrConn.QueryRowContext(context.TODO(), target).Scan(&targetChecksum) if err != nil { return err } @@ -184,7 +184,7 @@ func (c *Checker) Run(ctx context.Context) error { // The table. They MUST be created before the lock is released // with REPEATABLE-READ and a consistent snapshot (or dummy read) // to initialize the read-view. - c.trxPool, err = dbconn.NewTrxPool(ctx, c.db, c.concurrency, c.dbConfig) + c.rrConnPool, err = dbconn.NewRRConnPool(ctx, c.db, c.concurrency, c.dbConfig) if err != nil { return err } @@ -222,7 +222,7 @@ func (c *Checker) Run(ctx context.Context) error { c.isInvalid = true return err } - if err := c.ChecksumChunk(c.trxPool, chunk); err != nil { + if err := c.ChecksumChunk(c.rrConnPool, chunk); err != nil { c.isInvalid = true return err } @@ -234,7 +234,7 @@ func (c *Checker) Run(ctx context.Context) error { // Regardless of err state, we should attempt to rollback the transaction // in checksumTxns. They are likely holding metadata locks, which will block // further operations like cleanup or cut-over. - if err := c.trxPool.Close(); err != nil { + if err := c.rrConnPool.Close(); err != nil { return err } if err1 != nil { diff --git a/pkg/dbconn/connpool.go b/pkg/dbconn/connpool.go index 8767602f..eedc87bc 100644 --- a/pkg/dbconn/connpool.go +++ b/pkg/dbconn/connpool.go @@ -16,6 +16,32 @@ type ConnPool struct { conns []*sql.Conn } +// NewRRConnPool creates a pool of transactions which have already +// had their read-view created in REPEATABLE READ isolation. +func NewRRConnPool(ctx context.Context, db *sql.DB, count int, config *DBConfig) (*ConnPool, error) { + checksumTxns := make([]*sql.Conn, 0, count) + for i := 0; i < count; i++ { + conn, err := db.Conn(ctx) + if err != nil { + return nil, err + } + _, err = conn.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") + if err != nil { + return nil, err + } + _, err = conn.ExecContext(ctx, "START TRANSACTION WITH CONSISTENT SNAPSHOT") + if err != nil { + return nil, err + } + // Set SQL mode, charset, etc. + if err := standardizeConn(ctx, conn, config); err != nil { + return nil, err + } + checksumTxns = append(checksumTxns, conn) + } + return &ConnPool{conns: checksumTxns, config: config}, nil +} + // NewConnPool creates a pool of connections which have already // been standardised. func NewConnPool(ctx context.Context, db *sql.DB, count int, config *DBConfig) (*ConnPool, error) { diff --git a/pkg/dbconn/connpool_test.go b/pkg/dbconn/connpool_test.go new file mode 100644 index 00000000..932699f6 --- /dev/null +++ b/pkg/dbconn/connpool_test.go @@ -0,0 +1,81 @@ +package dbconn + +import ( + "context" + "database/sql" + "github.com/stretchr/testify/assert" + "sync" + "testing" + "time" +) + +func TestRetryableTrx(t *testing.T) { + db, err := sql.Open("mysql", dsn()) + assert.NoError(t, err) + defer db.Close() + config := NewDBConfig() + + pool, err := NewConnPool(context.TODO(), db, 2, NewDBConfig()) + defer pool.Close() + assert.NoError(t, err) + + err = DBExec(context.Background(), db, config, "DROP TABLE IF EXISTS test.dbexec") + assert.NoError(t, err) + err = DBExec(context.Background(), db, config, "CREATE TABLE test.dbexec (id INT NOT NULL PRIMARY KEY, colb int)") + assert.NoError(t, err) + + stmts := []string{ + "INSERT INTO test.dbexec (id, colb) VALUES (1, 1)", + "", // test empty + "INSERT INTO test.dbexec (id, colb) VALUES (2, 2)", + } + _, err = pool.RetryableTransaction(context.Background(), true, stmts...) + assert.NoError(t, err) + + _, err = pool.RetryableTransaction(context.Background(), true, "INSERT INTO test.dbexec (id, colb) VALUES (2, 2)") // duplicate + assert.Error(t, err) + + // duplicate, but creates a warning. Ignore duplicate warnings set to true. + _, err = pool.RetryableTransaction(context.Background(), true, "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)") + assert.NoError(t, err) + + // duplicate, but warning not ignored + _, err = pool.RetryableTransaction(context.Background(), false, "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)") + assert.Error(t, err) + + // start a transaction, acquire a lock for long enough that the first attempt times out + // but a retry is successful. + config.InnodbLockWaitTimeout = 1 + trx, err := db.Begin() + assert.NoError(t, err) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + _, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 1 FOR UPDATE") + assert.NoError(t, err) + wg.Done() + time.Sleep(2 * time.Second) + err = trx.Rollback() + assert.NoError(t, err) + }() + wg.Wait() + _, err = pool.RetryableTransaction(context.Background(), false, "UPDATE test.dbexec SET colb=123 WHERE id = 1") + assert.NoError(t, err) + + // Same again, but make the retry unsuccessful + config.InnodbLockWaitTimeout = 1 + config.MaxRetries = 2 + trx, err = db.Begin() + assert.NoError(t, err) + wg.Add(1) + go func() { + _, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 2 FOR UPDATE") + assert.NoError(t, err) + wg.Done() + }() + wg.Wait() + _, err = pool.RetryableTransaction(context.Background(), false, "UPDATE test.dbexec SET colb=123 WHERE id = 2") // this will fail, since it times out and exhausts retries. + assert.Error(t, err) + err = trx.Rollback() // now we can rollback. + assert.NoError(t, err) +} diff --git a/pkg/dbconn/dbconn.go b/pkg/dbconn/dbconn.go index 9c047177..46d5c7eb 100644 --- a/pkg/dbconn/dbconn.go +++ b/pkg/dbconn/dbconn.go @@ -4,12 +4,10 @@ package dbconn import ( "context" "database/sql" - "fmt" "math/rand" "time" "github.com/go-sql-driver/mysql" - "github.com/squareup/spirit/pkg/utils" ) const ( @@ -116,97 +114,6 @@ func canRetryError(err error) bool { } } -// RetryableTransaction retries all statements in a transaction, retrying if a statement -// errors, or there is a deadlock. It will retry up to maxRetries times. -func RetryableTransaction(ctx context.Context, db *sql.DB, ignoreDupKeyWarnings bool, config *DBConfig, stmts ...string) (int64, error) { - var err error - var trx *sql.Tx - var rowsAffected int64 -RETRYLOOP: - for i := 0; i < config.MaxRetries; i++ { - // Start a transaction - if trx, err = db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}); err != nil { - backoff(i) - continue RETRYLOOP // retry - } - // Standardize it. - if err = standardizeTrx(ctx, trx, config); err != nil { - utils.ErrInErr(trx.Rollback()) // Rollback - backoff(i) - continue RETRYLOOP // retry - } - // Execute all statements. - for _, stmt := range stmts { - if stmt == "" { - continue - } - var res sql.Result - if res, err = trx.ExecContext(ctx, stmt); err != nil { - if canRetryError(err) { - utils.ErrInErr(trx.Rollback()) // Rollback - backoff(i) - continue RETRYLOOP // retry - } - utils.ErrInErr(trx.Rollback()) // Rollback - return rowsAffected, err - } - // Even though there was no ERROR we still need to inspect SHOW WARNINGS - // This is because many of the statements use INSERT IGNORE. - warningRes, err := trx.QueryContext(ctx, "SHOW WARNINGS") //nolint: execinquery - if err != nil { - utils.ErrInErr(trx.Rollback()) // Rollback - return rowsAffected, err - } - defer warningRes.Close() - var level, code, message string - for warningRes.Next() { - err = warningRes.Scan(&level, &code, &message) - if err != nil { - utils.ErrInErr(trx.Rollback()) // Rollback - return rowsAffected, err - } - // We won't receive out of range warnings (1264) - // because the SQL mode has been unset. This is important - // because a historical value like 0000-00-00 00:00:00 - // might exist in the table and needs to be copied. - if code == "1062" && ignoreDupKeyWarnings { - continue // ignore duplicate key warnings - } else if code == "3170" { - // ER_CAPACITY_EXCEEDED - // "Memory capacity of 8388608 bytes for 'range_optimizer_max_mem_size' exceeded. - // Range optimization was not done for this query." - // i.e. the query still executes it just doesn't optimize perfectly - continue - } else { - utils.ErrInErr(trx.Rollback()) - return rowsAffected, fmt.Errorf("unsafe warning migrating chunk: %s, query: %s", message, stmt) - } - } - // As long as it is a statement that supports affected rows (err == nil) - // Get the number of rows affected and add it to the total balance. - count, err := res.RowsAffected() - if err == nil { // supported - rowsAffected += count - } - } - if err != nil { - utils.ErrInErr(trx.Rollback()) // Rollback - backoff(i) - continue RETRYLOOP - } - // Commit it. - if err = trx.Commit(); err != nil { - utils.ErrInErr(trx.Rollback()) - backoff(i) - continue RETRYLOOP - } - // Success! - return rowsAffected, nil - } - // We failed too many times, return the last error - return rowsAffected, err -} - // backoff sleeps a few milliseconds before retrying. func backoff(i int) { randFactor := i * rand.Intn(10) * int(time.Millisecond) diff --git a/pkg/dbconn/dbconn_test.go b/pkg/dbconn/dbconn_test.go index 5443d7f3..4e519eab 100644 --- a/pkg/dbconn/dbconn_test.go +++ b/pkg/dbconn/dbconn_test.go @@ -4,12 +4,9 @@ import ( "context" "database/sql" "fmt" + "github.com/stretchr/testify/assert" "os" - "sync" "testing" - "time" - - "github.com/stretchr/testify/assert" ) func dsn() string { @@ -66,69 +63,3 @@ func TestLockWaitTimeouts(t *testing.T) { assert.NoError(t, err) assert.Equal(t, fmt.Sprint(config.InnodbLockWaitTimeout), mysqlVar) } - -func TestRetryableTrx(t *testing.T) { - db, err := sql.Open("mysql", dsn()) - assert.NoError(t, err) - defer db.Close() - config := NewDBConfig() - err = DBExec(context.Background(), db, config, "DROP TABLE IF EXISTS test.dbexec") - assert.NoError(t, err) - err = DBExec(context.Background(), db, config, "CREATE TABLE test.dbexec (id INT NOT NULL PRIMARY KEY, colb int)") - assert.NoError(t, err) - - stmts := []string{ - "INSERT INTO test.dbexec (id, colb) VALUES (1, 1)", - "", // test empty - "INSERT INTO test.dbexec (id, colb) VALUES (2, 2)", - } - _, err = RetryableTransaction(context.Background(), db, true, NewDBConfig(), stmts...) - assert.NoError(t, err) - - _, err = RetryableTransaction(context.Background(), db, true, NewDBConfig(), "INSERT INTO test.dbexec (id, colb) VALUES (2, 2)") // duplicate - assert.Error(t, err) - - // duplicate, but creates a warning. Ignore duplicate warnings set to true. - _, err = RetryableTransaction(context.Background(), db, true, NewDBConfig(), "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)") - assert.NoError(t, err) - - // duplicate, but warning not ignored - _, err = RetryableTransaction(context.Background(), db, false, NewDBConfig(), "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)") - assert.Error(t, err) - - // start a transaction, acquire a lock for long enough that the first attempt times out - // but a retry is successful. - config.InnodbLockWaitTimeout = 1 - trx, err := db.Begin() - assert.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - _, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 1 FOR UPDATE") - assert.NoError(t, err) - wg.Done() - time.Sleep(2 * time.Second) - err = trx.Rollback() - assert.NoError(t, err) - }() - wg.Wait() - _, err = RetryableTransaction(context.Background(), db, false, config, "UPDATE test.dbexec SET colb=123 WHERE id = 1") - assert.NoError(t, err) - - // Same again, but make the retry unsuccessful - config.InnodbLockWaitTimeout = 1 - config.MaxRetries = 2 - trx, err = db.Begin() - assert.NoError(t, err) - wg.Add(1) - go func() { - _, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 2 FOR UPDATE") - assert.NoError(t, err) - wg.Done() - }() - wg.Wait() - _, err = RetryableTransaction(context.Background(), db, false, config, "UPDATE test.dbexec SET colb=123 WHERE id = 2") // this will fail, since it times out and exhausts retries. - assert.Error(t, err) - err = trx.Rollback() // now we can rollback. - assert.NoError(t, err) -} diff --git a/pkg/dbconn/trxpool.go b/pkg/dbconn/trxpool.go deleted file mode 100644 index ec4c18f4..00000000 --- a/pkg/dbconn/trxpool.go +++ /dev/null @@ -1,67 +0,0 @@ -package dbconn - -import ( - "context" - "database/sql" - "errors" - "sync" -) - -// Maybe there is a better way to do this. For the CHECKSUM algorithm we need -// not a set of DB connections, but a set of transactions which have all -// had a read-view created at a certain point in time. So we pre-create -// them in newTrxPool() under a mutex, and then have a simple Get() and Put() -// which is used by worker threads. - -type TrxPool struct { - sync.Mutex - trxs []*sql.Tx -} - -// NewTrxPool creates a pool of transactions which have already -// had their read-view created in REPEATABLE READ isolation. -func NewTrxPool(ctx context.Context, db *sql.DB, count int, config *DBConfig) (*TrxPool, error) { - checksumTxns := make([]*sql.Tx, 0, count) - for i := 0; i < count; i++ { - trx, _ := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) - _, err := trx.ExecContext(ctx, "START TRANSACTION WITH CONSISTENT SNAPSHOT") - if err != nil { - return nil, err - } - // Set SQL mode, charset, etc. - if err := standardizeTrx(ctx, trx, config); err != nil { - return nil, err - } - checksumTxns = append(checksumTxns, trx) - } - return &TrxPool{trxs: checksumTxns}, nil -} - -// Get gets a transaction from the pool. -func (p *TrxPool) Get() (*sql.Tx, error) { - p.Lock() - defer p.Unlock() - if len(p.trxs) == 0 { - return nil, errors.New("no transactions in pool") - } - trx := p.trxs[0] - p.trxs = p.trxs[1:] - return trx, nil -} - -// Put puts a transaction back in the pool. -func (p *TrxPool) Put(trx *sql.Tx) { - p.Lock() - defer p.Unlock() - p.trxs = append(p.trxs, trx) -} - -// Close closes all transactions in the pool. -func (p *TrxPool) Close() error { - for _, trx := range p.trxs { - if err := trx.Rollback(); err != nil { - return err - } - } - return nil -} diff --git a/pkg/dbconn/trxpool_test.go b/pkg/dbconn/trxpool_test.go deleted file mode 100644 index aa8036b3..00000000 --- a/pkg/dbconn/trxpool_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package dbconn - -import ( - "context" - "database/sql" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestTrxPool(t *testing.T) { - db, err := sql.Open("mysql", dsn()) - assert.NoError(t, err) - defer db.Close() - config := NewDBConfig() - config.LockWaitTimeout = 10 - err = DBExec(context.Background(), db, config, "DROP TABLE IF EXISTS test.trxpool") - assert.NoError(t, err) - err = DBExec(context.Background(), db, config, "CREATE TABLE test.trxpool (id INT NOT NULL PRIMARY KEY, colb int)") - assert.NoError(t, err) - - stmts := []string{ - "INSERT INTO test.trxpool (id, colb) VALUES (1, 1)", - "INSERT INTO test.trxpool (id, colb) VALUES (2, 2)", - } - _, err = RetryableTransaction(context.Background(), db, true, config, stmts...) - assert.NoError(t, err) - - // Test that the transaction pool is working. - pool, err := NewTrxPool(context.Background(), db, 2, config) - assert.NoError(t, err) - - // The pool is all repeatable-read transactions, so if I insert new rows - // They can't be visible. - _, err = RetryableTransaction(context.Background(), db, true, config, "INSERT INTO test.trxpool (id, colb) VALUES (3, 3)") - assert.NoError(t, err) - - trx1, err := pool.Get() - assert.NoError(t, err) - trx2, err := pool.Get() - assert.NoError(t, err) - var count int - err = trx1.QueryRow("SELECT COUNT(*) FROM test.trxpool WHERE id = 3").Scan(&count) - assert.NoError(t, err) - assert.Equal(t, 0, count) - err = trx2.QueryRow("SELECT COUNT(*) FROM test.trxpool WHERE id = 3").Scan(&count) - assert.NoError(t, err) - assert.Equal(t, 0, count) - - _, err = pool.Get() - assert.Error(t, err) // no trx in the pool - - pool.Put(trx1) - trx3, err := pool.Get() - assert.NoError(t, err) - pool.Put(trx3) - - assert.NoError(t, pool.Close()) -} diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 0e8e8a0f..ea18329d 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -550,7 +550,7 @@ func (r *Runner) postCutoverCheck(ctx context.Context) error { if err != nil { return err } - trxPool, err := dbconn.NewTrxPool(ctx, r.db, r.migration.Threads, r.dbConfig) + rrConnPool, err := dbconn.NewRRConnPool(ctx, r.db, r.migration.Threads, r.dbConfig) if err != nil { return err } @@ -573,7 +573,7 @@ func (r *Runner) postCutoverCheck(ctx context.Context) error { Inclusive: true, }, } - if err := checker.ChecksumChunk(trxPool, chunk); err != nil { + if err := checker.ChecksumChunk(rrConnPool, chunk); err != nil { r.logger.Error("differences found! This does not guarantee corruption since there is a brief race, but it is a good idea to investigate.") debug1 := fmt.Sprintf("SELECT * FROM %s WHERE %s ORDER BY %s", oldTable.QuotedName,