From ee42f9aced99f3bab81eac0db38ca81e14b40dd6 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Mon, 30 Jun 2025 19:01:06 +0200 Subject: [PATCH] Handle generated columns on ghostferry write operations. This PR modifies all `INSERT` logic so virtual (a.k.a generated) MySQL columns are not attempted to insert into, which otherwise breaks the ferrying process. See also https://github.com/Shopify/ghostferry/issues/338. --- dml_events.go | 47 +++++++++++--------- row_batch.go | 66 +++++++++++++++++++++++++--- test/go/dml_events_test.go | 30 +++++++++++++ test/go/row_batch_test.go | 26 +++++++++++ testhelpers/integration_test_case.go | 2 +- 5 files changed, 144 insertions(+), 27 deletions(-) diff --git a/dml_events.go b/dml_events.go index a87b1c13e..ba87a3fcb 100644 --- a/dml_events.go +++ b/dml_events.go @@ -42,14 +42,14 @@ type RowData []interface{} // https://github.com/Shopify/ghostferry/issues/165. // // In summary: -// - This code receives values from both go-sql-driver/mysql and -// go-mysql-org/go-mysql. -// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte -// slice for unsigned integer. -// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for -// unsigned integer. -// - We currently make this function deal with both cases. In the future we can -// investigate alternative solutions. +// - This code receives values from both go-sql-driver/mysql and +// go-mysql-org/go-mysql. +// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte +// slice for unsigned integer. +// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for +// unsigned integer. +// - We currently make this function deal with both cases. In the future we can +// investigate alternative solutions. func (r RowData) GetUint64(colIdx int) (uint64, error) { u64, ok := Uint64Value(r[colIdx]) if ok { @@ -175,7 +175,7 @@ func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, e query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + " (" + strings.Join(quotedColumnNames(e.table), ",") + ")" + - " VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")" + " VALUES (" + buildStringListForValues(e.table, e.newValues) + ")" return query, nil } @@ -324,9 +324,13 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re } func quotedColumnNames(table *TableSchema) []string { - cols := make([]string, len(table.Columns)) - for i, column := range table.Columns { - cols[i] = QuoteField(column.Name) + cols := []string{} + + for _, c := range table.Columns { + if c.IsVirtual { + continue + } + cols = append(cols, QuoteField(c.Name)) } return cols @@ -347,15 +351,18 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData return nil } -func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string { +func buildStringListForValues(table *TableSchema, values []interface{}) string { var buffer []byte for i, value := range values { - if i > 0 { - buffer = append(buffer, ',') + if table.Columns[i].IsVirtual { + continue } - buffer = appendEscapedValue(buffer, value, columns[i]) + if len(buffer) != 0 { + buffer = append(buffer, ',') + } + buffer = appendEscapedValue(buffer, value, table.Columns[i]) } return string(buffer) @@ -501,10 +508,10 @@ func Int64Value(value interface{}) (int64, bool) { // // This is specifically mentioned in the the below link: // -// When BINARY values are stored, they are right-padded with the pad value -// to the specified length. The pad value is 0x00 (the zero byte). Values -// are right-padded with 0x00 for inserts, and no trailing bytes are removed -// for retrievals. +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. // // ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte { diff --git a/row_batch.go b/row_batch.go index 4426fc127..d2aed4d59 100644 --- a/row_batch.go +++ b/row_batch.go @@ -64,23 +64,77 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface return "", nil, err } - valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)" + vcm := e.virtualColumnsMap() + valuesStr := "(" + strings.Repeat("?,", e.activeColumnCount(vcm)-1) + "?)" valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr + " (" + e.quotedFields(vcm) + ") VALUES " + valuesStr - return query, e.flattenRowData(), nil + return query, e.flattenRowData(vcm), nil } -func (e *RowBatch) flattenRowData() []interface{} { - rowSize := len(e.values[0]) +// virtualColumnsMap returns a map of given columns (by index) -> whether the column is virtual (i.e. generated). +func (e *RowBatch) virtualColumnsMap() map[int]bool { + res := map[int]bool{} + + for i, name := range e.columns { + isVirtual := false + for _, c := range e.table.Columns { + if name == c.Name && c.IsVirtual { + isVirtual = true + break + } + } + + res[i] = isVirtual + } + + return res +} + +// activeColumnCount returns the number of active (non-virtual) columns for this RowBatch. +func (e *RowBatch) activeColumnCount(vcm map[int]bool) int { + if vcm == nil { + return len(e.columns) + } + + count := 0 + for _, isVirtual := range vcm { + if !isVirtual { + count++ + } + } + return count +} + +// quotedFields returns a string with comma-separated quoted field names for INSERTs. +func (e *RowBatch) quotedFields(vcm map[int]bool) string { + cols := []string{} + for i, name := range e.columns { + if vcm != nil && vcm[i] { + continue + } + cols = append(cols, name) + } + + return strings.Join(QuoteFields(cols), ",") +} + +// flattenRowData flattens RowData values into a single array for INSERTs. +func (e *RowBatch) flattenRowData(vcm map[int]bool) []interface{} { + rowSize := e.activeColumnCount(vcm) flattened := make([]interface{}, rowSize*len(e.values)) for rowIdx, row := range e.values { + i := 0 for colIdx, col := range row { - flattened[rowIdx*rowSize+colIdx] = col + if vcm != nil && vcm[colIdx] { + continue + } + flattened[rowIdx*rowSize+i] = col + i++ } } diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index ef8928e0c..626ba4a06 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -83,6 +83,36 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3) } +func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQueryWithVirtualColumns() { + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{ + {1000, []byte("val1"), true}, + {1001, []byte("val2"), false}, + {1002, "{\"val\": 42.0}", false}, + }, + } + + // column 'col1' (#0) is generated so we should not insert into it. + this.targetTable.Columns[0].IsVirtual = true + + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(3, len(dmlEvents)) + + q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val1',1)", q1) + + q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val2',0)", q2) + + q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (CAST('{\"val\": 42.0}' AS JSON),0)", q3) +} + func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() { rowsEvent := &replication.RowsEvent{ Table: this.tableMapEvent, diff --git a/test/go/row_batch_test.go b/test/go/row_batch_test.go index 62ffbf9dd..633402dea 100644 --- a/test/go/row_batch_test.go +++ b/test/go/row_batch_test.go @@ -73,6 +73,32 @@ func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQuery() { this.Require().Equal(expected, v1) } +func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQueryWithVirtualColumns() { + vals := []ghostferry.RowData{ + ghostferry.RowData{1000, []byte("val1"), true}, + ghostferry.RowData{1001, []byte("val2"), true}, + ghostferry.RowData{1002, []byte("val3"), true}, + } + + // column 'col2' (#1) is generated so we should not insert into it. + this.targetTable.Columns[1].IsVirtual = true + + batch := ghostferry.NewRowBatch(this.sourceTable, vals, 0) + this.Require().Equal(vals, batch.Values()) + + q1, v1, err := batch.AsSQLQuery(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col3`) VALUES (?,?),(?,?),(?,?)", q1) + + expected := []interface{}{ + 1000, true, + 1001, true, + 1002, true, + } + + this.Require().Equal(expected, v1) +} + func (this *RowBatchTestSuite) TestRowBatchWithWrongColumnsReturnsError() { vals := []ghostferry.RowData{ ghostferry.RowData{1000, []byte("val0"), true}, diff --git a/testhelpers/integration_test_case.go b/testhelpers/integration_test_case.go index 5f4f767ff..34d8f32fc 100644 --- a/testhelpers/integration_test_case.go +++ b/testhelpers/integration_test_case.go @@ -122,7 +122,7 @@ func (this *IntegrationTestCase) VerifyData() { } if !verificationResult.DataCorrect { - this.T.Fatalf(verificationResult.Message) + this.T.Fatal(verificationResult.Message) } }