Skip to content

Commit 4960711

Browse files
authored
Merge branch 'master' into meiji163/generated-col-error
2 parents 1b863b0 + a834c00 commit 4960711

File tree

5 files changed

+423
-118
lines changed

5 files changed

+423
-118
lines changed

go/logic/applier.go

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type Applier struct {
6060
migrationContext *base.MigrationContext
6161
finishedMigrating int64
6262
name string
63+
64+
dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
65+
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
66+
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
6367
}
6468

6569
func NewApplier(migrationContext *base.MigrationContext) *Applier {
@@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
106110
return nil
107111
}
108112

113+
func (this *Applier) prepareQueries() (err error) {
114+
if this.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
115+
this.migrationContext.DatabaseName,
116+
this.migrationContext.GetGhostTableName(),
117+
this.migrationContext.OriginalTableColumns,
118+
&this.migrationContext.UniqueKey.Columns,
119+
); err != nil {
120+
return err
121+
}
122+
if this.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
123+
this.migrationContext.DatabaseName,
124+
this.migrationContext.GetGhostTableName(),
125+
this.migrationContext.OriginalTableColumns,
126+
this.migrationContext.SharedColumns,
127+
this.migrationContext.MappedSharedColumns,
128+
); err != nil {
129+
return err
130+
}
131+
if this.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
132+
this.migrationContext.DatabaseName,
133+
this.migrationContext.GetGhostTableName(),
134+
this.migrationContext.OriginalTableColumns,
135+
this.migrationContext.SharedColumns,
136+
this.migrationContext.MappedSharedColumns,
137+
&this.migrationContext.UniqueKey.Columns,
138+
); err != nil {
139+
return err
140+
}
141+
return nil
142+
}
143+
109144
// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
110145
func (this *Applier) validateAndReadGlobalVariables() error {
111146
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
@@ -631,6 +666,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
631666
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
632667
this.migrationContext.GetIteration() == 0,
633668
this.migrationContext.IsTransactionalTable(),
669+
// TODO: Don't hardcode this
670+
strings.HasPrefix(this.migrationContext.ApplierMySQLVersion, "8."),
634671
)
635672
if err != nil {
636673
return chunkSize, rowsAffected, duration, err
@@ -1135,35 +1172,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv
11351172

11361173
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
11371174
// event entry on the original table.
1138-
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
1175+
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
11391176
switch dmlEvent.DML {
11401177
case binlog.DeleteDML:
11411178
{
1142-
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
1143-
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
1179+
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
1180+
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
11441181
}
11451182
case binlog.InsertDML:
11461183
{
1147-
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
1148-
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
1184+
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
1185+
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
11491186
}
11501187
case binlog.UpdateDML:
11511188
{
11521189
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
1190+
results := make([]*dmlBuildResult, 0, 2)
11531191
dmlEvent.DML = binlog.DeleteDML
11541192
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
11551193
dmlEvent.DML = binlog.InsertDML
11561194
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
11571195
return results
11581196
}
1159-
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
1197+
query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
11601198
args := sqlutils.Args()
11611199
args = append(args, sharedArgs...)
11621200
args = append(args, uniqueKeyArgs...)
1163-
return append(results, newDmlBuildResult(query, args, 0, err))
1201+
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
11641202
}
11651203
}
1166-
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
1204+
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
11671205
}
11681206

11691207
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table

go/logic/applier_test.go

Lines changed: 174 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
gosql "database/sql"
1111
"strings"
1212
"testing"
13+
"time"
1314

1415
"github.com/stretchr/testify/require"
1516
"github.com/stretchr/testify/suite"
@@ -100,6 +101,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
100101
columnValues := sql.ToColumnValues([]interface{}{123456, 42})
101102

102103
migrationContext := base.NewMigrationContext()
104+
migrationContext.DatabaseName = "test"
103105
migrationContext.OriginalTableName = "test"
104106
migrationContext.OriginalTableColumns = columns
105107
migrationContext.SharedColumns = columns
@@ -110,6 +112,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
110112
}
111113

112114
applier := NewApplier(migrationContext)
115+
applier.prepareQueries()
113116

114117
t.Run("delete", func(t *testing.T) {
115118
binlogEvent := &binlog.BinlogDMLEvent{
@@ -199,6 +202,30 @@ type ApplierTestSuite struct {
199202
mysqlContainer testcontainers.Container
200203
}
201204

205+
func (suite *ApplierTestSuite) getConnectionConfig(ctx context.Context) (*mysql.ConnectionConfig, error) {
206+
host, err := suite.mysqlContainer.ContainerIP(ctx)
207+
if err != nil {
208+
return nil, err
209+
}
210+
211+
config := mysql.NewConnectionConfig()
212+
config.Key.Hostname = host
213+
config.Key.Port = 3306
214+
config.User = "root"
215+
config.Password = "root-password"
216+
217+
return config, nil
218+
}
219+
220+
func (suite *ApplierTestSuite) getDb(ctx context.Context) (*gosql.DB, error) {
221+
host, err := suite.mysqlContainer.ContainerIP(ctx)
222+
if err != nil {
223+
return nil, err
224+
}
225+
226+
return gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test")
227+
}
228+
202229
func (suite *ApplierTestSuite) SetupSuite() {
203230
ctx := context.Background()
204231
req := testcontainers.ContainerRequest{
@@ -229,7 +256,7 @@ func (suite *ApplierTestSuite) SetupTest() {
229256
suite.Require().NoError(err)
230257
suite.Require().Equalf(0, rc, "failed to created database: expected exit code 0, got %d", rc)
231258

232-
rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT);"})
259+
rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT, PRIMARY KEY (id));"})
233260
suite.Require().NoError(err)
234261
suite.Require().Equalf(0, rc, "failed to created table: expected exit code 0, got %d", rc)
235262
}
@@ -245,15 +272,11 @@ func (suite *ApplierTestSuite) TearDownTest() {
245272
func (suite *ApplierTestSuite) TestInitDBConnections() {
246273
ctx := context.Background()
247274

248-
host, err := suite.mysqlContainer.ContainerIP(ctx)
275+
connectionConfig, err := suite.getConnectionConfig(ctx)
249276
suite.Require().NoError(err)
250277

251278
migrationContext := base.NewMigrationContext()
252-
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
253-
migrationContext.ApplierConnectionConfig.Key.Hostname = host
254-
migrationContext.ApplierConnectionConfig.Key.Port = 3306
255-
migrationContext.ApplierConnectionConfig.User = "root"
256-
migrationContext.ApplierConnectionConfig.Password = "root-password"
279+
migrationContext.ApplierConnectionConfig = connectionConfig
257280
migrationContext.DatabaseName = "test"
258281
migrationContext.OriginalTableName = "testing"
259282
migrationContext.SetConnectionConfig("innodb")
@@ -274,24 +297,25 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
274297
func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
275298
ctx := context.Background()
276299

277-
host, err := suite.mysqlContainer.ContainerIP(ctx)
300+
connectionConfig, err := suite.getConnectionConfig(ctx)
278301
suite.Require().NoError(err)
279302

280303
migrationContext := base.NewMigrationContext()
281-
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
282-
migrationContext.ApplierConnectionConfig.Key.Hostname = host
283-
migrationContext.ApplierConnectionConfig.Key.Port = 3306
284-
migrationContext.ApplierConnectionConfig.User = "root"
285-
migrationContext.ApplierConnectionConfig.Password = "root-password"
304+
migrationContext.ApplierConnectionConfig = connectionConfig
286305
migrationContext.DatabaseName = "test"
287306
migrationContext.OriginalTableName = "testing"
288307
migrationContext.SetConnectionConfig("innodb")
289308

290309
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
291310
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
292311
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
312+
migrationContext.UniqueKey = &sql.UniqueKey{
313+
Name: "primary_key",
314+
Columns: *sql.NewColumnList([]string{"id"}),
315+
}
293316

294317
applier := NewApplier(migrationContext)
318+
suite.Require().NoError(applier.prepareQueries())
295319
defer applier.Teardown()
296320

297321
err = applier.InitDBConnections()
@@ -313,7 +337,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
313337
suite.Require().NoError(err)
314338

315339
// Check that the row was inserted
316-
db, err := gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test")
340+
db, err := suite.getDb(ctx)
317341
suite.Require().NoError(err)
318342
defer db.Close()
319343

@@ -340,15 +364,11 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
340364
func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
341365
ctx := context.Background()
342366

343-
host, err := suite.mysqlContainer.ContainerIP(ctx)
367+
connectionConfig, err := suite.getConnectionConfig(ctx)
344368
suite.Require().NoError(err)
345369

346370
migrationContext := base.NewMigrationContext()
347-
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
348-
migrationContext.ApplierConnectionConfig.Key.Hostname = host
349-
migrationContext.ApplierConnectionConfig.Key.Port = 3306
350-
migrationContext.ApplierConnectionConfig.User = "root"
351-
migrationContext.ApplierConnectionConfig.Password = "root-password"
371+
migrationContext.ApplierConnectionConfig = connectionConfig
352372
migrationContext.DatabaseName = "test"
353373
migrationContext.OriginalTableName = "testing"
354374
migrationContext.SetConnectionConfig("innodb")
@@ -367,6 +387,140 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
367387
suite.Require().NoError(err)
368388
}
369389

390+
func (suite *ApplierTestSuite) TestApplyIterationInsertQuery() {
391+
ctx := context.Background()
392+
393+
connectionConfig, err := suite.getConnectionConfig(ctx)
394+
suite.Require().NoError(err)
395+
396+
migrationContext := base.NewMigrationContext()
397+
migrationContext.ApplierConnectionConfig = connectionConfig
398+
migrationContext.DatabaseName = "test"
399+
migrationContext.OriginalTableName = "testing"
400+
migrationContext.ChunkSize = 10
401+
migrationContext.SetConnectionConfig("innodb")
402+
403+
db, err := suite.getDb(ctx)
404+
suite.Require().NoError(err)
405+
defer db.Close()
406+
407+
_, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))")
408+
suite.Require().NoError(err)
409+
410+
// Insert some test values
411+
for i := 1; i <= 10; i++ {
412+
_, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i)
413+
suite.Require().NoError(err)
414+
}
415+
416+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
417+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
418+
migrationContext.UniqueKey = &sql.UniqueKey{
419+
Name: "PRIMARY",
420+
Columns: *sql.NewColumnList([]string{"id"}),
421+
}
422+
423+
migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1})
424+
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10})
425+
426+
applier := NewApplier(migrationContext)
427+
defer applier.Teardown()
428+
429+
err = applier.InitDBConnections()
430+
suite.Require().NoError(err)
431+
432+
chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery()
433+
suite.Require().NoError(err)
434+
435+
suite.Require().Equal(migrationContext.ChunkSize, chunkSize)
436+
suite.Require().Equal(int64(10), rowsAffected)
437+
suite.Require().Greater(duration, time.Duration(0))
438+
439+
// Check that the rows were inserted
440+
rows, err := db.Query("SELECT * FROM test._testing_gho")
441+
suite.Require().NoError(err)
442+
defer rows.Close()
443+
444+
var count, id, item_id int
445+
for rows.Next() {
446+
err = rows.Scan(&id, &item_id)
447+
suite.Require().NoError(err)
448+
count += 1
449+
}
450+
suite.Require().NoError(rows.Err())
451+
452+
suite.Require().Equal(10, count)
453+
}
454+
455+
func (suite *ApplierTestSuite) TestApplyIterationInsertQueryFailsFastWhenSelectingLockedRows() {
456+
ctx := context.Background()
457+
458+
connectionConfig, err := suite.getConnectionConfig(ctx)
459+
suite.Require().NoError(err)
460+
461+
migrationContext := base.NewMigrationContext()
462+
migrationContext.ApplierConnectionConfig = connectionConfig
463+
migrationContext.DatabaseName = "test"
464+
migrationContext.OriginalTableName = "testing"
465+
migrationContext.ChunkSize = 10
466+
migrationContext.TableEngine = "innodb"
467+
migrationContext.SetConnectionConfig("innodb")
468+
469+
db, err := suite.getDb(ctx)
470+
suite.Require().NoError(err)
471+
defer db.Close()
472+
473+
_, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))")
474+
suite.Require().NoError(err)
475+
476+
// Insert some test values
477+
for i := 1; i <= 10; i++ {
478+
_, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i)
479+
suite.Require().NoError(err)
480+
}
481+
482+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
483+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
484+
migrationContext.UniqueKey = &sql.UniqueKey{
485+
Name: "PRIMARY",
486+
Columns: *sql.NewColumnList([]string{"id"}),
487+
}
488+
489+
migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1})
490+
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10})
491+
492+
applier := NewApplier(migrationContext)
493+
defer applier.Teardown()
494+
495+
err = applier.InitDBConnections()
496+
suite.Require().NoError(err)
497+
498+
// Lock one of the rows
499+
tx, err := db.Begin()
500+
suite.Require().NoError(err)
501+
defer func() {
502+
suite.Require().NoError(tx.Rollback())
503+
}()
504+
505+
_, err = tx.Exec("SELECT * FROM test.testing WHERE id = 5 FOR UPDATE")
506+
suite.Require().NoError(err)
507+
508+
chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery()
509+
suite.Require().Error(err)
510+
suite.Require().EqualError(err, "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.")
511+
512+
suite.Require().Equal(migrationContext.ChunkSize, chunkSize)
513+
suite.Require().Equal(int64(0), rowsAffected)
514+
suite.Require().Equal(time.Duration(0), duration)
515+
516+
// Check that the no rows were inserted
517+
var count int
518+
err = db.QueryRow("SELECT COUNT(*) FROM test._testing_gho").Scan(&count)
519+
suite.Require().NoError(err)
520+
521+
suite.Require().Equal(0, count)
522+
}
523+
370524
func TestApplier(t *testing.T) {
371525
suite.Run(t, new(ApplierTestSuite))
372526
}

go/logic/migrator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,10 @@ func (this *Migrator) Migrate() (err error) {
386386
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
387387
return err
388388
}
389+
// We can prepare some of the queries on the applier
390+
if err := this.applier.prepareQueries(); err != nil {
391+
return err
392+
}
389393
// Validation complete! We're good to execute this migration
390394
if err := this.hooksExecutor.onValidated(); err != nil {
391395
return err

0 commit comments

Comments
 (0)