@@ -60,6 +60,10 @@ type Applier struct {
60
60
migrationContext * base.MigrationContext
61
61
finishedMigrating int64
62
62
name string
63
+
64
+ dmlDeleteQueryBuilder * sql.DMLDeleteQueryBuilder
65
+ dmlInsertQueryBuilder * sql.DMLInsertQueryBuilder
66
+ dmlUpdateQueryBuilder * sql.DMLUpdateQueryBuilder
63
67
}
64
68
65
69
func NewApplier (migrationContext * base.MigrationContext ) * Applier {
@@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
106
110
return nil
107
111
}
108
112
113
+ func (this * Applier ) prepareQueries () (err error ) {
114
+ if this .dmlDeleteQueryBuilder , err = sql .NewDMLDeleteQueryBuilder (
115
+ this .migrationContext .DatabaseName ,
116
+ this .migrationContext .GetGhostTableName (),
117
+ this .migrationContext .OriginalTableColumns ,
118
+ & this .migrationContext .UniqueKey .Columns ,
119
+ ); err != nil {
120
+ return err
121
+ }
122
+ if this .dmlInsertQueryBuilder , err = sql .NewDMLInsertQueryBuilder (
123
+ this .migrationContext .DatabaseName ,
124
+ this .migrationContext .GetGhostTableName (),
125
+ this .migrationContext .OriginalTableColumns ,
126
+ this .migrationContext .SharedColumns ,
127
+ this .migrationContext .MappedSharedColumns ,
128
+ ); err != nil {
129
+ return err
130
+ }
131
+ if this .dmlUpdateQueryBuilder , err = sql .NewDMLUpdateQueryBuilder (
132
+ this .migrationContext .DatabaseName ,
133
+ this .migrationContext .GetGhostTableName (),
134
+ this .migrationContext .OriginalTableColumns ,
135
+ this .migrationContext .SharedColumns ,
136
+ this .migrationContext .MappedSharedColumns ,
137
+ & this .migrationContext .UniqueKey .Columns ,
138
+ ); err != nil {
139
+ return err
140
+ }
141
+ return nil
142
+ }
143
+
109
144
// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
110
145
func (this * Applier ) validateAndReadGlobalVariables () error {
111
146
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
@@ -631,6 +666,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
631
666
this .migrationContext .MigrationIterationRangeMaxValues .AbstractValues (),
632
667
this .migrationContext .GetIteration () == 0 ,
633
668
this .migrationContext .IsTransactionalTable (),
669
+ // TODO: Don't hardcode this
670
+ strings .HasPrefix (this .migrationContext .ApplierMySQLVersion , "8." ),
634
671
)
635
672
if err != nil {
636
673
return chunkSize , rowsAffected , duration , err
@@ -1135,35 +1172,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv
1135
1172
1136
1173
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
1137
1174
// event entry on the original table.
1138
- func (this * Applier ) buildDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) ( results []( * dmlBuildResult )) {
1175
+ func (this * Applier ) buildDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) [] * dmlBuildResult {
1139
1176
switch dmlEvent .DML {
1140
1177
case binlog .DeleteDML :
1141
1178
{
1142
- query , uniqueKeyArgs , err := sql . BuildDMLDeleteQuery ( dmlEvent . DatabaseName , this .migrationContext . GetGhostTableName (), this . migrationContext . OriginalTableColumns , & this . migrationContext . UniqueKey . Columns , dmlEvent .WhereColumnValues .AbstractValues ())
1143
- return append ( results , newDmlBuildResult (query , uniqueKeyArgs , - 1 , err ))
1179
+ query , uniqueKeyArgs , err := this .dmlDeleteQueryBuilder . BuildQuery ( dmlEvent .WhereColumnValues .AbstractValues ())
1180
+ return [] * dmlBuildResult { newDmlBuildResult (query , uniqueKeyArgs , - 1 , err )}
1144
1181
}
1145
1182
case binlog .InsertDML :
1146
1183
{
1147
- query , sharedArgs , err := sql . BuildDMLInsertQuery ( dmlEvent . DatabaseName , this .migrationContext . GetGhostTableName (), this . migrationContext . OriginalTableColumns , this . migrationContext . SharedColumns , this . migrationContext . MappedSharedColumns , dmlEvent .NewColumnValues .AbstractValues ())
1148
- return append ( results , newDmlBuildResult (query , sharedArgs , 1 , err ))
1184
+ query , sharedArgs , err := this .dmlInsertQueryBuilder . BuildQuery ( dmlEvent .NewColumnValues .AbstractValues ())
1185
+ return [] * dmlBuildResult { newDmlBuildResult (query , sharedArgs , 1 , err )}
1149
1186
}
1150
1187
case binlog .UpdateDML :
1151
1188
{
1152
1189
if _ , isModified := this .updateModifiesUniqueKeyColumns (dmlEvent ); isModified {
1190
+ results := make ([]* dmlBuildResult , 0 , 2 )
1153
1191
dmlEvent .DML = binlog .DeleteDML
1154
1192
results = append (results , this .buildDMLEventQuery (dmlEvent )... )
1155
1193
dmlEvent .DML = binlog .InsertDML
1156
1194
results = append (results , this .buildDMLEventQuery (dmlEvent )... )
1157
1195
return results
1158
1196
}
1159
- query , sharedArgs , uniqueKeyArgs , err := sql . BuildDMLUpdateQuery ( dmlEvent . DatabaseName , this .migrationContext . GetGhostTableName (), this . migrationContext . OriginalTableColumns , this . migrationContext . SharedColumns , this . migrationContext . MappedSharedColumns , & this . migrationContext . UniqueKey . Columns , dmlEvent .NewColumnValues .AbstractValues (), dmlEvent .WhereColumnValues .AbstractValues ())
1197
+ query , sharedArgs , uniqueKeyArgs , err := this .dmlUpdateQueryBuilder . BuildQuery ( dmlEvent .NewColumnValues .AbstractValues (), dmlEvent .WhereColumnValues .AbstractValues ())
1160
1198
args := sqlutils .Args ()
1161
1199
args = append (args , sharedArgs ... )
1162
1200
args = append (args , uniqueKeyArgs ... )
1163
- return append ( results , newDmlBuildResult (query , args , 0 , err ))
1201
+ return [] * dmlBuildResult { newDmlBuildResult (query , args , 0 , err )}
1164
1202
}
1165
1203
}
1166
- return append ( results , newDmlBuildResultError (fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML )))
1204
+ return [] * dmlBuildResult { newDmlBuildResultError (fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML ))}
1167
1205
}
1168
1206
1169
1207
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
0 commit comments