From 0a0d1f4848eda7d39119ed69eaed27ee5dc30788 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Sun, 20 Oct 2024 10:14:15 +0000 Subject: [PATCH 1/5] Do not lock rows when copying. --- go/logic/applier.go | 1 - go/sql/builder.go | 13 ++++--------- go/sql/builder_test.go | 15 +++++++-------- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 990fbe720..ae05d19a6 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -630,7 +630,6 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, - this.migrationContext.IsTransactionalTable(), ) if err != nil { return chunkSize, rowsAffected, duration, err diff --git a/go/sql/builder.go b/go/sql/builder.go index 7be428f93..e73bedb9d 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -182,7 +182,7 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool) (result string, explodedArgs []interface{}, err error) { if len(sharedColumns) == 0 { return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } @@ -217,10 +217,6 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin return "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) - transactionalClause := "" - if transactionalTable { - transactionalClause = "lock in share mode" - } result = fmt.Sprintf(` insert /* gh-ost %s.%s */ ignore into @@ -233,18 +229,17 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin force index (%s) where (%s and %s) - %s )`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, uniqueKey, - rangeStartComparison, rangeEndComparison, transactionalClause) + rangeStartComparison, rangeEndComparison) return result, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool) (result string, explodedArgs []interface{}, err error) { rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 574e8bb1b..5929e94a2 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -172,7 +172,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -199,7 +199,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -240,14 +240,14 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) - ( + ( select id, name, position from mydb.tbl @@ -268,7 +268,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -302,7 +302,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) + query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -315,7 +315,6 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { mydb.tbl force index (name_position_uidx) where (((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))) - lock in share mode )` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) @@ -335,7 +334,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) { test.S(t).ExpectNil(err) expected := ` select /* gh-ost mydb.tbl test */ name, position - from ( + from ( select name, position from From 16a88d8425e2bfd2c688919c77be360947f97ebb Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 22 Oct 2024 18:56:26 +0000 Subject: [PATCH 2/5] Add `for share nowait` option. --- go/logic/applier_test.go | 186 ++++++++++++++++++++++++++++++++++----- go/sql/builder.go | 1 + go/sql/builder_test.go | 5 ++ 3 files changed, 172 insertions(+), 20 deletions(-) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index e9426bbc5..5cddaa3e4 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "strings" "testing" + "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -199,6 +200,30 @@ type ApplierTestSuite struct { mysqlContainer testcontainers.Container } +func (suite *ApplierTestSuite) getConnectionConfig(ctx context.Context) (*mysql.ConnectionConfig, error) { + host, err := suite.mysqlContainer.ContainerIP(ctx) + if err != nil { + return nil, err + } + + config := mysql.NewConnectionConfig() + config.Key.Hostname = host + config.Key.Port = 3306 + config.User = "root" + config.Password = "root-password" + + return config, nil +} + +func (suite *ApplierTestSuite) getDb(ctx context.Context) (*gosql.DB, error) { + host, err := suite.mysqlContainer.ContainerIP(ctx) + if err != nil { + return nil, err + } + + return gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test") +} + func (suite *ApplierTestSuite) SetupSuite() { ctx := context.Background() req := testcontainers.ContainerRequest{ @@ -229,7 +254,7 @@ func (suite *ApplierTestSuite) SetupTest() { suite.Require().NoError(err) suite.Require().Equalf(0, rc, "failed to created database: expected exit code 0, got %d", rc) - rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT);"}) + rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT, PRIMARY KEY (id));"}) suite.Require().NoError(err) suite.Require().Equalf(0, rc, "failed to created table: expected exit code 0, got %d", rc) } @@ -245,15 +270,11 @@ func (suite *ApplierTestSuite) TearDownTest() { func (suite *ApplierTestSuite) TestInitDBConnections() { ctx := context.Background() - host, err := suite.mysqlContainer.ContainerIP(ctx) + connectionConfig, err := suite.getConnectionConfig(ctx) suite.Require().NoError(err) migrationContext := base.NewMigrationContext() - migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig() - migrationContext.ApplierConnectionConfig.Key.Hostname = host - migrationContext.ApplierConnectionConfig.Key.Port = 3306 - migrationContext.ApplierConnectionConfig.User = "root" - migrationContext.ApplierConnectionConfig.Password = "root-password" + migrationContext.ApplierConnectionConfig = connectionConfig migrationContext.DatabaseName = "test" migrationContext.OriginalTableName = "testing" migrationContext.SetConnectionConfig("innodb") @@ -274,15 +295,11 @@ func (suite *ApplierTestSuite) TestInitDBConnections() { func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { ctx := context.Background() - host, err := suite.mysqlContainer.ContainerIP(ctx) + connectionConfig, err := suite.getConnectionConfig(ctx) suite.Require().NoError(err) migrationContext := base.NewMigrationContext() - migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig() - migrationContext.ApplierConnectionConfig.Key.Hostname = host - migrationContext.ApplierConnectionConfig.Key.Port = 3306 - migrationContext.ApplierConnectionConfig.User = "root" - migrationContext.ApplierConnectionConfig.Password = "root-password" + migrationContext.ApplierConnectionConfig = connectionConfig migrationContext.DatabaseName = "test" migrationContext.OriginalTableName = "testing" migrationContext.SetConnectionConfig("innodb") @@ -313,7 +330,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { suite.Require().NoError(err) // Check that the row was inserted - db, err := gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test") + db, err := suite.getDb(ctx) suite.Require().NoError(err) defer db.Close() @@ -340,15 +357,11 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() { ctx := context.Background() - host, err := suite.mysqlContainer.ContainerIP(ctx) + connectionConfig, err := suite.getConnectionConfig(ctx) suite.Require().NoError(err) migrationContext := base.NewMigrationContext() - migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig() - migrationContext.ApplierConnectionConfig.Key.Hostname = host - migrationContext.ApplierConnectionConfig.Key.Port = 3306 - migrationContext.ApplierConnectionConfig.User = "root" - migrationContext.ApplierConnectionConfig.Password = "root-password" + migrationContext.ApplierConnectionConfig = connectionConfig migrationContext.DatabaseName = "test" migrationContext.OriginalTableName = "testing" migrationContext.SetConnectionConfig("innodb") @@ -367,6 +380,139 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() { suite.Require().NoError(err) } +func (suite *ApplierTestSuite) TestApplyIterationInsertQuery() { + ctx := context.Background() + + connectionConfig, err := suite.getConnectionConfig(ctx) + suite.Require().NoError(err) + + migrationContext := base.NewMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.DatabaseName = "test" + migrationContext.OriginalTableName = "testing" + migrationContext.ChunkSize = 10 + migrationContext.SetConnectionConfig("innodb") + + db, err := suite.getDb(ctx) + suite.Require().NoError(err) + defer db.Close() + + _, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))") + suite.Require().NoError(err) + + // Insert some test values + for i := 1; i <= 10; i++ { + _, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i) + suite.Require().NoError(err) + } + + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1}) + migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10}) + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery() + suite.Require().NoError(err) + + suite.Require().Equal(int64(migrationContext.ChunkSize), chunkSize) + suite.Require().Equal(int64(10), rowsAffected) + suite.Require().Greater(duration, time.Duration(0)) + + // Check that the rows were inserted + rows, err := db.Query("SELECT * FROM test._testing_gho") + suite.Require().NoError(err) + defer rows.Close() + + var count, id, item_id int + for rows.Next() { + err = rows.Scan(&id, &item_id) + suite.Require().NoError(err) + count += 1 + } + suite.Require().NoError(rows.Err()) + + suite.Require().Equal(10, count) +} + +func (suite *ApplierTestSuite) TestApplyIterationInsertQueryFailsFastWhenSelectingLockedRows() { + ctx := context.Background() + + connectionConfig, err := suite.getConnectionConfig(ctx) + suite.Require().NoError(err) + + migrationContext := base.NewMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.DatabaseName = "test" + migrationContext.OriginalTableName = "testing" + migrationContext.ChunkSize = 10 + migrationContext.SetConnectionConfig("innodb") + + db, err := suite.getDb(ctx) + suite.Require().NoError(err) + defer db.Close() + + _, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))") + suite.Require().NoError(err) + + // Insert some test values + for i := 1; i <= 10; i++ { + _, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i) + suite.Require().NoError(err) + } + + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1}) + migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10}) + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Lock one of the rows + tx, err := db.Begin() + suite.Require().NoError(err) + defer func() { + suite.Require().NoError(tx.Rollback()) + }() + + _, err = tx.Exec("SELECT * FROM test.testing WHERE id = 5 FOR UPDATE") + suite.Require().NoError(err) + + chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery() + suite.Require().Error(err) + suite.Require().EqualError(err, "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.") + + suite.Require().Equal(int64(migrationContext.ChunkSize), chunkSize) + suite.Require().Equal(int64(0), rowsAffected) + suite.Require().Equal(time.Duration(0), duration) + + // Check that the no rows were inserted + var count int + err = db.QueryRow("SELECT COUNT(*) FROM test._testing_gho").Scan(&count) + suite.Require().NoError(err) + + suite.Require().Equal(0, count) +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } diff --git a/go/sql/builder.go b/go/sql/builder.go index e73bedb9d..4f6c55389 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -229,6 +229,7 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin force index (%s) where (%s and %s) + for share nowait )`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, uniqueKey, diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index f3eca750e..dc8fb9025 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -186,6 +186,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, 3, 103, 103}, explodedArgs) @@ -219,6 +220,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, explodedArgs) @@ -255,6 +257,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, 3, 103, 103}, explodedArgs) @@ -284,6 +287,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, explodedArgs) @@ -314,6 +318,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { mydb.tbl force index (name_position_uidx) where (((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))) + for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, explodedArgs) From 095fb267f247d4b7498a7bd0616e64d5fce718e0 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 22 Oct 2024 21:07:00 +0000 Subject: [PATCH 3/5] Fix linter issues. --- go/logic/applier_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 5cddaa3e4..0c6708505 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -425,7 +425,7 @@ func (suite *ApplierTestSuite) TestApplyIterationInsertQuery() { chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery() suite.Require().NoError(err) - suite.Require().Equal(int64(migrationContext.ChunkSize), chunkSize) + suite.Require().Equal(migrationContext.ChunkSize, chunkSize) suite.Require().Equal(int64(10), rowsAffected) suite.Require().Greater(duration, time.Duration(0)) @@ -501,7 +501,7 @@ func (suite *ApplierTestSuite) TestApplyIterationInsertQueryFailsFastWhenSelecti suite.Require().Error(err) suite.Require().EqualError(err, "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.") - suite.Require().Equal(int64(migrationContext.ChunkSize), chunkSize) + suite.Require().Equal(migrationContext.ChunkSize, chunkSize) suite.Require().Equal(int64(0), rowsAffected) suite.Require().Equal(time.Duration(0), duration) From a91c386f5cfa65fc4884ade7f26195207e749240 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 22 Oct 2024 21:39:22 +0000 Subject: [PATCH 4/5] Only specify `for share nowait` on transactional tables. --- go/logic/applier.go | 1 + go/logic/applier_test.go | 1 + go/sql/builder.go | 14 +++++++++----- go/sql/builder_test.go | 10 +++++----- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index ae05d19a6..990fbe720 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -630,6 +630,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, + this.migrationContext.IsTransactionalTable(), ) if err != nil { return chunkSize, rowsAffected, duration, err diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 0c6708505..f424c83e4 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -456,6 +456,7 @@ func (suite *ApplierTestSuite) TestApplyIterationInsertQueryFailsFastWhenSelecti migrationContext.DatabaseName = "test" migrationContext.OriginalTableName = "testing" migrationContext.ChunkSize = 10 + migrationContext.TableEngine = "innodb" migrationContext.SetConnectionConfig("innodb") db, err := suite.getDb(ctx) diff --git a/go/sql/builder.go b/go/sql/builder.go index 4f6c55389..948ecc16e 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -182,7 +182,7 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { if len(sharedColumns) == 0 { return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } @@ -212,6 +212,10 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin return "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) + transactionalClause := "" + if transactionalTable { + transactionalClause = "for share nowait" + } rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) if err != nil { return "", explodedArgs, err @@ -229,18 +233,18 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin force index (%s) where (%s and %s) - for share nowait + %s )`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, uniqueKey, - rangeStartComparison, rangeEndComparison) + rangeStartComparison, rangeEndComparison, transactionalClause) return result, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index dc8fb9025..5e225ccd3 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -171,7 +171,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -199,7 +199,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -241,7 +241,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -270,7 +270,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -305,7 +305,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true) + query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore From 5ddeb21d22ca5c880602617d8a46978bd67ac4ec Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 23 Oct 2024 12:40:40 +0000 Subject: [PATCH 5/5] Only use `NOWAIT` on MySQL 8. --- go/logic/applier.go | 2 ++ go/sql/builder.go | 12 ++++++++---- go/sql/builder_test.go | 10 +++++----- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 990fbe720..9979aa120 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -631,6 +631,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, this.migrationContext.IsTransactionalTable(), + // TODO: Don't hardcode this + strings.HasPrefix(this.migrationContext.ApplierMySQLVersion, "8."), ) if err != nil { return chunkSize, rowsAffected, duration, err diff --git a/go/sql/builder.go b/go/sql/builder.go index 948ecc16e..6e9ec9c40 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -182,7 +182,7 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) { if len(sharedColumns) == 0 { return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } @@ -214,7 +214,11 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin explodedArgs = append(explodedArgs, rangeExplodedArgs...) transactionalClause := "" if transactionalTable { - transactionalClause = "for share nowait" + if noWait { + transactionalClause = "for share nowait" + } else { + transactionalClause = "lock in share mode" + } } rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) if err != nil { @@ -241,10 +245,10 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin return result, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) { rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 5e225ccd3..2dd85c1f1 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -171,7 +171,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -199,7 +199,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -241,7 +241,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -270,7 +270,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -305,7 +305,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) + query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true, true) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore