Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ type RowData []interface{}
// https://github.com/Shopify/ghostferry/issues/165.
//
// In summary:
// - This code receives values from both go-sql-driver/mysql and
// go-mysql-org/go-mysql.
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
// slice for unsigned integer.
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
// unsigned integer.
// - We currently make this function deal with both cases. In the future we can
// investigate alternative solutions.
// - This code receives values from both go-sql-driver/mysql and
// go-mysql-org/go-mysql.
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
// slice for unsigned integer.
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
// unsigned integer.
// - We currently make this function deal with both cases. In the future we can
// investigate alternative solutions.
func (r RowData) GetUint64(colIdx int) (uint64, error) {
u64, ok := Uint64Value(r[colIdx])
if ok {
Expand Down Expand Up @@ -175,7 +175,7 @@ func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, e
query := "INSERT IGNORE INTO " +
QuotedTableNameFromString(schemaName, tableName) +
" (" + strings.Join(quotedColumnNames(e.table), ",") + ")" +
" VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")"
" VALUES (" + buildStringListForValues(e.table, e.newValues) + ")"

return query, nil
}
Expand Down Expand Up @@ -324,9 +324,13 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re
}

func quotedColumnNames(table *TableSchema) []string {
cols := make([]string, len(table.Columns))
for i, column := range table.Columns {
cols[i] = QuoteField(column.Name)
cols := []string{}

for _, c := range table.Columns {
if c.IsVirtual {
continue
}
cols = append(cols, QuoteField(c.Name))
}

return cols
Expand All @@ -347,15 +351,18 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData
return nil
}

func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string {
func buildStringListForValues(table *TableSchema, values []interface{}) string {
var buffer []byte

for i, value := range values {
if i > 0 {
buffer = append(buffer, ',')
if table.Columns[i].IsVirtual {
continue
}

buffer = appendEscapedValue(buffer, value, columns[i])
if len(buffer) != 0 {
buffer = append(buffer, ',')
}
buffer = appendEscapedValue(buffer, value, table.Columns[i])
}

return string(buffer)
Expand Down Expand Up @@ -501,10 +508,10 @@ func Int64Value(value interface{}) (int64, bool) {
//
// This is specifically mentioned in the the below link:
//
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
//
// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html
func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte {
Expand Down
66 changes: 60 additions & 6 deletions row_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,77 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface
return "", nil, err
}

valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)"
vcm := e.virtualColumnsMap()
valuesStr := "(" + strings.Repeat("?,", e.activeColumnCount(vcm)-1) + "?)"
valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr

query := "INSERT IGNORE INTO " +
QuotedTableNameFromString(schemaName, tableName) +
" (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr
" (" + e.quotedFields(vcm) + ") VALUES " + valuesStr

return query, e.flattenRowData(), nil
return query, e.flattenRowData(vcm), nil
}

func (e *RowBatch) flattenRowData() []interface{} {
rowSize := len(e.values[0])
// virtualColumnsMap returns a map of given columns (by index) -> whether the column is virtual (i.e. generated).
func (e *RowBatch) virtualColumnsMap() map[int]bool {
res := map[int]bool{}

for i, name := range e.columns {
isVirtual := false
for _, c := range e.table.Columns {
if name == c.Name && c.IsVirtual {
isVirtual = true
break
}
}

res[i] = isVirtual
}

return res
}

// activeColumnCount returns the number of active (non-virtual) columns for this RowBatch.
func (e *RowBatch) activeColumnCount(vcm map[int]bool) int {
if vcm == nil {
return len(e.columns)
}

count := 0
for _, isVirtual := range vcm {
if !isVirtual {
count++
}
}
return count
}

// quotedFields returns a string with comma-separated quoted field names for INSERTs.
func (e *RowBatch) quotedFields(vcm map[int]bool) string {
cols := []string{}
for i, name := range e.columns {
if vcm != nil && vcm[i] {
continue
}
cols = append(cols, name)
}

return strings.Join(QuoteFields(cols), ",")
}

// flattenRowData flattens RowData values into a single array for INSERTs.
func (e *RowBatch) flattenRowData(vcm map[int]bool) []interface{} {
rowSize := e.activeColumnCount(vcm)
flattened := make([]interface{}, rowSize*len(e.values))

for rowIdx, row := range e.values {
i := 0
for colIdx, col := range row {
flattened[rowIdx*rowSize+colIdx] = col
if vcm != nil && vcm[colIdx] {
continue
}
flattened[rowIdx*rowSize+i] = col
i++
}
}

Expand Down
30 changes: 30 additions & 0 deletions test/go/dml_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,36 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() {
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3)
}

func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQueryWithVirtualColumns() {
rowsEvent := &replication.RowsEvent{
Table: this.tableMapEvent,
Rows: [][]interface{}{
{1000, []byte("val1"), true},
{1001, []byte("val2"), false},
{1002, "{\"val\": 42.0}", false},
},
}

// column 'col1' (#0) is generated so we should not insert into it.
this.targetTable.Columns[0].IsVirtual = true

dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent)
this.Require().Nil(err)
this.Require().Equal(3, len(dmlEvents))

q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val1',1)", q1)

q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val2',0)", q2)

q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (CAST('{\"val\": 42.0}' AS JSON),0)", q3)
}

func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() {
rowsEvent := &replication.RowsEvent{
Table: this.tableMapEvent,
Expand Down
26 changes: 26 additions & 0 deletions test/go/row_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,32 @@ func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQuery() {
this.Require().Equal(expected, v1)
}

func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQueryWithVirtualColumns() {
vals := []ghostferry.RowData{
ghostferry.RowData{1000, []byte("val1"), true},
ghostferry.RowData{1001, []byte("val2"), true},
ghostferry.RowData{1002, []byte("val3"), true},
}

// column 'col2' (#1) is generated so we should not insert into it.
this.targetTable.Columns[1].IsVirtual = true

batch := ghostferry.NewRowBatch(this.sourceTable, vals, 0)
this.Require().Equal(vals, batch.Values())

q1, v1, err := batch.AsSQLQuery(this.targetTable.Schema, this.targetTable.Name)
this.Require().Nil(err)
this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col3`) VALUES (?,?),(?,?),(?,?)", q1)

expected := []interface{}{
1000, true,
1001, true,
1002, true,
}

this.Require().Equal(expected, v1)
}

func (this *RowBatchTestSuite) TestRowBatchWithWrongColumnsReturnsError() {
vals := []ghostferry.RowData{
ghostferry.RowData{1000, []byte("val0"), true},
Expand Down
2 changes: 1 addition & 1 deletion testhelpers/integration_test_case.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (this *IntegrationTestCase) VerifyData() {
}

if !verificationResult.DataCorrect {
this.T.Fatalf(verificationResult.Message)
this.T.Fatal(verificationResult.Message)
}
}

Expand Down