Skip to content

Commit

Permalink
Reusing connection pool for repeatable read connections too
Browse files Browse the repository at this point in the history
  • Loading branch information
prudhvi committed Sep 5, 2023
1 parent 2839e49 commit 30aa76c
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 300 deletions.
18 changes: 9 additions & 9 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Checker struct {
concurrency int
feed *repl.Client
db *sql.DB
trxPool *dbconn.TrxPool
rrConnPool *dbconn.ConnPool
isInvalid bool
chunker table.Chunker
StartTime time.Time
Expand Down Expand Up @@ -82,13 +82,13 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c
return checksum, nil
}

func (c *Checker) ChecksumChunk(trxPool *dbconn.TrxPool, chunk *table.Chunk) error {
func (c *Checker) ChecksumChunk(rrConnPool *dbconn.ConnPool, chunk *table.Chunk) error {
startTime := time.Now()
trx, err := trxPool.Get()
rrConn, err := rrConnPool.Get()
if err != nil {
return err
}
defer trxPool.Put(trx)
defer rrConnPool.Put(rrConn)
c.logger.Debugf("checksumming chunk: %s", chunk.String())
source := fmt.Sprintf("SELECT BIT_XOR(CRC32(CONCAT(%s))) as checksum FROM %s WHERE %s",
c.intersectColumns(),
Expand All @@ -101,11 +101,11 @@ func (c *Checker) ChecksumChunk(trxPool *dbconn.TrxPool, chunk *table.Chunk) err
chunk.String(),
)
var sourceChecksum, targetChecksum int64
err = trx.QueryRow(source).Scan(&sourceChecksum)
err = rrConn.QueryRowContext(context.TODO(), source).Scan(&sourceChecksum)
if err != nil {
return err
}
err = trx.QueryRow(target).Scan(&targetChecksum)
err = rrConn.QueryRowContext(context.TODO(), target).Scan(&targetChecksum)
if err != nil {
return err
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (c *Checker) Run(ctx context.Context) error {
// The table. They MUST be created before the lock is released
// with REPEATABLE-READ and a consistent snapshot (or dummy read)
// to initialize the read-view.
c.trxPool, err = dbconn.NewTrxPool(ctx, c.db, c.concurrency, c.dbConfig)
c.rrConnPool, err = dbconn.NewRRConnPool(ctx, c.db, c.concurrency, c.dbConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (c *Checker) Run(ctx context.Context) error {
c.isInvalid = true
return err
}
if err := c.ChecksumChunk(c.trxPool, chunk); err != nil {
if err := c.ChecksumChunk(c.rrConnPool, chunk); err != nil {
c.isInvalid = true
return err
}
Expand All @@ -234,7 +234,7 @@ func (c *Checker) Run(ctx context.Context) error {
// Regardless of err state, we should attempt to rollback the transaction
// in checksumTxns. They are likely holding metadata locks, which will block
// further operations like cleanup or cut-over.
if err := c.trxPool.Close(); err != nil {
if err := c.rrConnPool.Close(); err != nil {
return err
}
if err1 != nil {
Expand Down
26 changes: 26 additions & 0 deletions pkg/dbconn/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@ type ConnPool struct {
conns []*sql.Conn
}

// NewRRConnPool creates a pool of transactions which have already
// had their read-view created in REPEATABLE READ isolation.
func NewRRConnPool(ctx context.Context, db *sql.DB, count int, config *DBConfig) (*ConnPool, error) {
checksumTxns := make([]*sql.Conn, 0, count)
for i := 0; i < count; i++ {
conn, err := db.Conn(ctx)
if err != nil {
return nil, err
}
_, err = conn.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
if err != nil {
return nil, err
}
_, err = conn.ExecContext(ctx, "START TRANSACTION WITH CONSISTENT SNAPSHOT")
if err != nil {
return nil, err
}
// Set SQL mode, charset, etc.
if err := standardizeConn(ctx, conn, config); err != nil {
return nil, err
}
checksumTxns = append(checksumTxns, conn)
}
return &ConnPool{conns: checksumTxns, config: config}, nil
}

// NewConnPool creates a pool of connections which have already
// been standardised.
func NewConnPool(ctx context.Context, db *sql.DB, count int, config *DBConfig) (*ConnPool, error) {
Expand Down
81 changes: 81 additions & 0 deletions pkg/dbconn/connpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package dbconn

import (
"context"
"database/sql"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)

func TestRetryableTrx(t *testing.T) {
db, err := sql.Open("mysql", dsn())
assert.NoError(t, err)
defer db.Close()
config := NewDBConfig()

pool, err := NewConnPool(context.TODO(), db, 2, NewDBConfig())
defer pool.Close()
assert.NoError(t, err)

err = DBExec(context.Background(), db, config, "DROP TABLE IF EXISTS test.dbexec")
assert.NoError(t, err)
err = DBExec(context.Background(), db, config, "CREATE TABLE test.dbexec (id INT NOT NULL PRIMARY KEY, colb int)")
assert.NoError(t, err)

stmts := []string{
"INSERT INTO test.dbexec (id, colb) VALUES (1, 1)",
"", // test empty
"INSERT INTO test.dbexec (id, colb) VALUES (2, 2)",
}
_, err = pool.RetryableTransaction(context.Background(), true, stmts...)
assert.NoError(t, err)

_, err = pool.RetryableTransaction(context.Background(), true, "INSERT INTO test.dbexec (id, colb) VALUES (2, 2)") // duplicate
assert.Error(t, err)

// duplicate, but creates a warning. Ignore duplicate warnings set to true.
_, err = pool.RetryableTransaction(context.Background(), true, "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)")
assert.NoError(t, err)

// duplicate, but warning not ignored
_, err = pool.RetryableTransaction(context.Background(), false, "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)")
assert.Error(t, err)

// start a transaction, acquire a lock for long enough that the first attempt times out
// but a retry is successful.
config.InnodbLockWaitTimeout = 1
trx, err := db.Begin()
assert.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
_, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 1 FOR UPDATE")
assert.NoError(t, err)
wg.Done()
time.Sleep(2 * time.Second)
err = trx.Rollback()
assert.NoError(t, err)
}()
wg.Wait()
_, err = pool.RetryableTransaction(context.Background(), false, "UPDATE test.dbexec SET colb=123 WHERE id = 1")
assert.NoError(t, err)

// Same again, but make the retry unsuccessful
config.InnodbLockWaitTimeout = 1
config.MaxRetries = 2
trx, err = db.Begin()
assert.NoError(t, err)
wg.Add(1)
go func() {
_, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 2 FOR UPDATE")
assert.NoError(t, err)
wg.Done()
}()
wg.Wait()
_, err = pool.RetryableTransaction(context.Background(), false, "UPDATE test.dbexec SET colb=123 WHERE id = 2") // this will fail, since it times out and exhausts retries.
assert.Error(t, err)
err = trx.Rollback() // now we can rollback.
assert.NoError(t, err)
}
93 changes: 0 additions & 93 deletions pkg/dbconn/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ package dbconn
import (
"context"
"database/sql"
"fmt"
"math/rand"
"time"

"github.com/go-sql-driver/mysql"
"github.com/squareup/spirit/pkg/utils"
)

const (
Expand Down Expand Up @@ -116,97 +114,6 @@ func canRetryError(err error) bool {
}
}

// RetryableTransaction retries all statements in a transaction, retrying if a statement
// errors, or there is a deadlock. It will retry up to maxRetries times.
func RetryableTransaction(ctx context.Context, db *sql.DB, ignoreDupKeyWarnings bool, config *DBConfig, stmts ...string) (int64, error) {
var err error
var trx *sql.Tx
var rowsAffected int64
RETRYLOOP:
for i := 0; i < config.MaxRetries; i++ {
// Start a transaction
if trx, err = db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}); err != nil {
backoff(i)
continue RETRYLOOP // retry
}
// Standardize it.
if err = standardizeTrx(ctx, trx, config); err != nil {
utils.ErrInErr(trx.Rollback()) // Rollback
backoff(i)
continue RETRYLOOP // retry
}
// Execute all statements.
for _, stmt := range stmts {
if stmt == "" {
continue
}
var res sql.Result
if res, err = trx.ExecContext(ctx, stmt); err != nil {
if canRetryError(err) {
utils.ErrInErr(trx.Rollback()) // Rollback
backoff(i)
continue RETRYLOOP // retry
}
utils.ErrInErr(trx.Rollback()) // Rollback
return rowsAffected, err
}
// Even though there was no ERROR we still need to inspect SHOW WARNINGS
// This is because many of the statements use INSERT IGNORE.
warningRes, err := trx.QueryContext(ctx, "SHOW WARNINGS") //nolint: execinquery
if err != nil {
utils.ErrInErr(trx.Rollback()) // Rollback
return rowsAffected, err
}
defer warningRes.Close()
var level, code, message string
for warningRes.Next() {
err = warningRes.Scan(&level, &code, &message)
if err != nil {
utils.ErrInErr(trx.Rollback()) // Rollback
return rowsAffected, err
}
// We won't receive out of range warnings (1264)
// because the SQL mode has been unset. This is important
// because a historical value like 0000-00-00 00:00:00
// might exist in the table and needs to be copied.
if code == "1062" && ignoreDupKeyWarnings {
continue // ignore duplicate key warnings
} else if code == "3170" {
// ER_CAPACITY_EXCEEDED
// "Memory capacity of 8388608 bytes for 'range_optimizer_max_mem_size' exceeded.
// Range optimization was not done for this query."
// i.e. the query still executes it just doesn't optimize perfectly
continue
} else {
utils.ErrInErr(trx.Rollback())
return rowsAffected, fmt.Errorf("unsafe warning migrating chunk: %s, query: %s", message, stmt)
}
}
// As long as it is a statement that supports affected rows (err == nil)
// Get the number of rows affected and add it to the total balance.
count, err := res.RowsAffected()
if err == nil { // supported
rowsAffected += count
}
}
if err != nil {
utils.ErrInErr(trx.Rollback()) // Rollback
backoff(i)
continue RETRYLOOP
}
// Commit it.
if err = trx.Commit(); err != nil {
utils.ErrInErr(trx.Rollback())
backoff(i)
continue RETRYLOOP
}
// Success!
return rowsAffected, nil
}
// We failed too many times, return the last error
return rowsAffected, err
}

// backoff sleeps a few milliseconds before retrying.
func backoff(i int) {
randFactor := i * rand.Intn(10) * int(time.Millisecond)
Expand Down
71 changes: 1 addition & 70 deletions pkg/dbconn/dbconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"context"
"database/sql"
"fmt"
"github.com/stretchr/testify/assert"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func dsn() string {
Expand Down Expand Up @@ -66,69 +63,3 @@ func TestLockWaitTimeouts(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, fmt.Sprint(config.InnodbLockWaitTimeout), mysqlVar)
}

func TestRetryableTrx(t *testing.T) {
db, err := sql.Open("mysql", dsn())
assert.NoError(t, err)
defer db.Close()
config := NewDBConfig()
err = DBExec(context.Background(), db, config, "DROP TABLE IF EXISTS test.dbexec")
assert.NoError(t, err)
err = DBExec(context.Background(), db, config, "CREATE TABLE test.dbexec (id INT NOT NULL PRIMARY KEY, colb int)")
assert.NoError(t, err)

stmts := []string{
"INSERT INTO test.dbexec (id, colb) VALUES (1, 1)",
"", // test empty
"INSERT INTO test.dbexec (id, colb) VALUES (2, 2)",
}
_, err = RetryableTransaction(context.Background(), db, true, NewDBConfig(), stmts...)
assert.NoError(t, err)

_, err = RetryableTransaction(context.Background(), db, true, NewDBConfig(), "INSERT INTO test.dbexec (id, colb) VALUES (2, 2)") // duplicate
assert.Error(t, err)

// duplicate, but creates a warning. Ignore duplicate warnings set to true.
_, err = RetryableTransaction(context.Background(), db, true, NewDBConfig(), "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)")
assert.NoError(t, err)

// duplicate, but warning not ignored
_, err = RetryableTransaction(context.Background(), db, false, NewDBConfig(), "INSERT IGNORE INTO test.dbexec (id, colb) VALUES (2, 2)")
assert.Error(t, err)

// start a transaction, acquire a lock for long enough that the first attempt times out
// but a retry is successful.
config.InnodbLockWaitTimeout = 1
trx, err := db.Begin()
assert.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
_, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 1 FOR UPDATE")
assert.NoError(t, err)
wg.Done()
time.Sleep(2 * time.Second)
err = trx.Rollback()
assert.NoError(t, err)
}()
wg.Wait()
_, err = RetryableTransaction(context.Background(), db, false, config, "UPDATE test.dbexec SET colb=123 WHERE id = 1")
assert.NoError(t, err)

// Same again, but make the retry unsuccessful
config.InnodbLockWaitTimeout = 1
config.MaxRetries = 2
trx, err = db.Begin()
assert.NoError(t, err)
wg.Add(1)
go func() {
_, err = trx.Exec("SELECT * FROM test.dbexec WHERE id = 2 FOR UPDATE")
assert.NoError(t, err)
wg.Done()
}()
wg.Wait()
_, err = RetryableTransaction(context.Background(), db, false, config, "UPDATE test.dbexec SET colb=123 WHERE id = 2") // this will fail, since it times out and exhausts retries.
assert.Error(t, err)
err = trx.Rollback() // now we can rollback.
assert.NoError(t, err)
}
Loading

0 comments on commit 30aa76c

Please sign in to comment.