Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer(ticdc): query row checksum from the tidb #11342

Merged
merged 12 commits into from
Jul 11, 2024
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func TestE2ERowLevelChecksum(t *testing.T) {
require.NoError(t, err)

// decoder enable checksum functionality.
decoder := avro.NewDecoder(codecConfig, schemaM, topic)
decoder := avro.NewDecoder(codecConfig, schemaM, topic, nil)
err = decoder.AddKeyValue(msg[0].Key, msg[0].Value)
require.NoError(t, err)

Expand Down
6 changes: 6 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,10 +1252,16 @@
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEvent.Columns = nil
if deleteEvent.Checksum != nil {
deleteEvent.Checksum.Current = 0
}

Check warning on line 1257 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L1256-L1257

Added lines #L1256 - L1257 were not covered by tests

insertEvent := *updateEvent
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil
if insertEvent.Checksum != nil {
insertEvent.Checksum.Previous = 0
}

Check warning on line 1264 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L1263-L1264

Added lines #L1263 - L1264 were not covered by tests

log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs),
zap.Uint64("commitTs", updateEvent.CommitTs),
Expand Down
6 changes: 4 additions & 2 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewDecoder(ctx context.Context, option *option, upstreamTiDB *sql.DB) (code
if err != nil {
return decoder, cerror.Trace(err)
}
decoder = avro.NewDecoder(option.codecConfig, schemaM, option.topic)
decoder = avro.NewDecoder(option.codecConfig, schemaM, option.topic, upstreamTiDB)
case config.ProtocolSimple:
decoder, err = simple.NewDecoder(ctx, option.codecConfig, upstreamTiDB)
default:
Expand Down Expand Up @@ -116,13 +116,15 @@ func newWriter(ctx context.Context, o *option) *writer {
zap.Any("topic", o.topic), zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules))

var db *sql.DB
if o.codecConfig.LargeMessageHandle.HandleKeyOnly() {

if o.upstreamTiDBDSN != "" {
db, err = openDB(ctx, o.upstreamTiDBDSN)
if err != nil {
log.Panic("cannot open the upstream TiDB, handle key only enabled",
zap.String("dsn", o.upstreamTiDBDSN))
}
}

for i := 0; i < int(o.partitionNum); i++ {
decoder, err := NewDecoder(ctx, o, db)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDMLEventE2E(t *testing.T) {
schemaM, err := NewConfluentSchemaManager(ctx, "http://127.0.0.1:8081", nil)
require.NoError(t, err)

decoder := NewDecoder(codecConfig, schemaM, topic)
decoder := NewDecoder(codecConfig, schemaM, topic, nil)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

Expand Down Expand Up @@ -95,7 +95,7 @@ func TestDDLEventE2E(t *testing.T) {
require.NotNil(t, message)

topic := "test-topic"
decoder := NewDecoder(codecConfig, nil, topic)
decoder := NewDecoder(codecConfig, nil, topic, nil)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

Expand Down Expand Up @@ -129,7 +129,7 @@ func TestResolvedE2E(t *testing.T) {
require.NotNil(t, message)

topic := "test-topic"
decoder := NewDecoder(codecConfig, nil, topic)
decoder := NewDecoder(codecConfig, nil, topic, nil)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

Expand Down
21 changes: 17 additions & 4 deletions pkg/sink/codec/avro/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import (
"context"
"database/sql"
"encoding/binary"
"encoding/json"
"fmt"
Expand All @@ -26,6 +27,7 @@
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"go.uber.org/zap"
Expand All @@ -35,6 +37,8 @@
config *common.Config
topic string

upstreamTiDB *sql.DB

schemaM SchemaManager

key []byte
Expand All @@ -46,11 +50,13 @@
config *common.Config,
schemaM SchemaManager,
topic string,
db *sql.DB,
) codec.RowEventDecoder {
return &decoder{
config: config,
topic: topic,
schemaM: schemaM,
config: config,
topic: topic,
schemaM: schemaM,
upstreamTiDB: db,
}
}

Expand Down Expand Up @@ -129,6 +135,13 @@
if err != nil {
return nil, errors.Trace(err)
}
corrupted := isCorrupted(valueMap)
if found {
event.Checksum = &integrity.Checksum{
Current: uint32(expectedChecksum),
Corrupted: corrupted,
}
}

Check warning on line 144 in pkg/sink/codec/avro/decoder.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/avro/decoder.go#L140-L144

Added lines #L140 - L144 were not covered by tests

if isCorrupted(valueMap) {
log.Warn("row data is corrupted",
Expand All @@ -146,7 +159,7 @@
}

if found {
if err := common.VerifyChecksum(event.Columns, event.TableInfo.Columns, uint32(expectedChecksum)); err != nil {
if err = common.VerifyChecksum(event, d.upstreamTiDB); err != nil {

Check warning on line 162 in pkg/sink/codec/avro/decoder.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/avro/decoder.go#L162

Added line #L162 was not covered by tests
return nil, errors.Trace(err)
}
}
Expand Down
112 changes: 112 additions & 0 deletions pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

"github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -76,6 +77,117 @@
return timezone
}

func queryRowChecksum(
ctx context.Context, db *sql.DB, event *model.RowChangedEvent,
) error {
var (
schema = event.TableInfo.GetSchemaName()
table = event.TableInfo.GetTableName()
commitTs = event.GetCommitTs()
)

pkNames := event.TableInfo.GetPrimaryKeyColumnNames()
if len(pkNames) == 0 {
log.Warn("cannot query row checksum without primary key",
zap.String("schema", schema), zap.String("table", table))
return nil
}

Check warning on line 94 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L82-L94

Added lines #L82 - L94 were not covered by tests

conn, err := db.Conn(ctx)
if err != nil {
log.Panic("establish connection to the upstream tidb failed",
zap.String("schema", schema), zap.String("table", table),
zap.Uint64("commitTs", commitTs), zap.Error(err))
}
defer conn.Close()

if event.Checksum.Current != 0 {
conditions := make(map[string]interface{})
for _, name := range pkNames {
for _, col := range event.Columns {
if event.TableInfo.ForceGetColumnName(col.ColumnID) == name {
conditions[name] = col.Value
}

Check warning on line 110 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L96-L110

Added lines #L96 - L110 were not covered by tests
}
}
result := queryRowChecksumAux(ctx, conn, commitTs, schema, table, conditions)
if result != 0 && result != event.Checksum.Current {
log.Error("verify upstream TiDB columns-level checksum, current checksum mismatch",
zap.Uint32("expected", event.Checksum.Current),
zap.Uint32("actual", result))
return errors.New("checksum mismatch")
}

Check warning on line 119 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L113-L119

Added lines #L113 - L119 were not covered by tests
}

if event.Checksum.Previous != 0 {
conditions := make(map[string]interface{})
for _, name := range pkNames {
for _, col := range event.PreColumns {
if event.TableInfo.ForceGetColumnName(col.ColumnID) == name {
conditions[name] = col.Value
}

Check warning on line 128 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L122-L128

Added lines #L122 - L128 were not covered by tests
}
}
result := queryRowChecksumAux(ctx, conn, commitTs-1, schema, table, conditions)
if result != 0 && result != event.Checksum.Previous {
log.Error("verify upstream TiDB columns-level checksum, previous checksum mismatch",
zap.Uint32("expected", event.Checksum.Previous),
zap.Uint32("actual", result))
return errors.New("checksum mismatch")
}

Check warning on line 137 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L131-L137

Added lines #L131 - L137 were not covered by tests
}

return nil

Check warning on line 140 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L140

Added line #L140 was not covered by tests
}

func queryRowChecksumAux(
ctx context.Context, conn *sql.Conn, commitTs uint64, schema string, table string, conditions map[string]interface{},
) uint32 {
var result uint32
// 1. set snapshot read
query := fmt.Sprintf("set @@tidb_snapshot=%d", commitTs)
_, err := conn.ExecContext(ctx, query)
if err != nil {
mysqlErr, ok := errors.Cause(err).(*mysql.MySQLError)
if ok {
// Error 8055 (HY000): snapshot is older than GC safe point
if mysqlErr.Number == 8055 {
log.Error("set snapshot read failed, since snapshot is older than GC safe point")
}

Check warning on line 156 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L145-L156

Added lines #L145 - L156 were not covered by tests
}

log.Error("set snapshot read failed",
zap.String("query", query),
zap.String("schema", schema), zap.String("table", table),
zap.Uint64("commitTs", commitTs), zap.Error(err))
return result

Check warning on line 163 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L159-L163

Added lines #L159 - L163 were not covered by tests
}

query = fmt.Sprintf("select tidb_row_checksum() from %s.%s where ", schema, table)
var whereClause string
for name, value := range conditions {
if whereClause != "" {
whereClause += " and "
}
switch value.(type) {
case []byte, string:
whereClause += fmt.Sprintf("%s = '%v'", name, value)
default:
whereClause += fmt.Sprintf("%s = %v", name, value)

Check warning on line 176 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L166-L176

Added lines #L166 - L176 were not covered by tests
}
}
query += whereClause

err = conn.QueryRowContext(ctx, query).Scan(&result)
if err != nil {
log.Panic("scan row failed",
zap.String("query", query),
zap.String("schema", schema), zap.String("table", table),
zap.Uint64("commitTs", commitTs), zap.Error(err))
}
return result

Check warning on line 188 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L179-L188

Added lines #L179 - L188 were not covered by tests
}

// MustSnapshotQuery query the db by the snapshot read with the given commitTs
func MustSnapshotQuery(
ctx context.Context, db *sql.DB, commitTs uint64, schema, table string, conditions map[string]interface{},
Expand Down
57 changes: 44 additions & 13 deletions pkg/sink/codec/common/verify_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
package common

import (
"context"
"database/sql"
"encoding/binary"
"fmt"
"hash/crc32"
"math"
"strconv"
Expand All @@ -29,24 +32,52 @@
)

// VerifyChecksum calculate the checksum value, and compare it with the expected one, return error if not identical.
func VerifyChecksum(columns []*model.ColumnData, columnInfo []*timodel.ColumnInfo, expected uint32) error {
func VerifyChecksum(event *model.RowChangedEvent, db *sql.DB) error {

Check warning on line 35 in pkg/sink/codec/common/verify_checksum.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/verify_checksum.go#L35

Added line #L35 was not covered by tests
// if expected is 0, it means the checksum is not enabled, so we don't need to verify it.
// the data maybe restored by br, and the checksum is not enabled, so no expected here.
if expected == 0 {
return nil
}
checksum, err := calculateChecksum(columns, columnInfo)
if err != nil {
return errors.Trace(err)
if event.Checksum.Current != 0 {
checksum, err := calculateChecksum(event.Columns, event.TableInfo.Columns)
if err != nil {
return errors.Trace(err)
}
if checksum != event.Checksum.Current {
log.Error("current checksum mismatch",
zap.Uint32("expected", event.Checksum.Current), zap.Uint32("actual", checksum), zap.Any("event", event))
for _, col := range event.Columns {
colInfo := event.TableInfo.ForceGetColumnInfo(col.ColumnID)
log.Info("data corrupted, print each column for debugging",
zap.String("name", colInfo.Name.O), zap.Any("type", colInfo.GetType()),
zap.Any("charset", colInfo.GetCharset()), zap.Any("flag", colInfo.GetFlag()),
zap.Any("value", col.Value), zap.Any("default", colInfo.GetDefaultValue()))
}
return fmt.Errorf("current checksum mismatch, current: %d, expected: %d", checksum, event.Checksum.Current)

Check warning on line 53 in pkg/sink/codec/common/verify_checksum.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/verify_checksum.go#L38-L53

Added lines #L38 - L53 were not covered by tests
}
}
if checksum != expected {
log.Error("checksum mismatch",
zap.Uint32("expected", expected),
zap.Uint32("actual", checksum))
return errors.New("checksum mismatch")
if event.Checksum.Previous != 0 {
checksum, err := calculateChecksum(event.PreColumns, event.TableInfo.Columns)
if err != nil {
return errors.Trace(err)
}
if checksum != event.Checksum.Previous {
log.Error("previous checksum mismatch",
zap.Uint32("expected", event.Checksum.Previous),
zap.Uint32("actual", checksum), zap.Any("event", event))
for _, col := range event.PreColumns {
colInfo := event.TableInfo.ForceGetColumnInfo(col.ColumnID)
log.Info("data corrupted, print each column for debugging",
zap.String("name", colInfo.Name.O), zap.Any("type", colInfo.GetType()),
zap.Any("charset", colInfo.GetCharset()), zap.Any("flag", colInfo.GetFlag()),
zap.Any("value", col.Value), zap.Any("default", colInfo.GetDefaultValue()))
}
return fmt.Errorf("previous checksum mismatch, current: %d, expected: %d", checksum, event.Checksum.Previous)

Check warning on line 72 in pkg/sink/codec/common/verify_checksum.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/verify_checksum.go#L56-L72

Added lines #L56 - L72 were not covered by tests
}
}

return nil
if db == nil {
return nil
}

Check warning on line 78 in pkg/sink/codec/common/verify_checksum.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/verify_checksum.go#L76-L78

Added lines #L76 - L78 were not covered by tests
// also query the upstream TiDB to get the columns-level checksum
return queryRowChecksum(context.Background(), db, event)

Check warning on line 80 in pkg/sink/codec/common/verify_checksum.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/verify_checksum.go#L80

Added line #L80 was not covered by tests
}

// calculate the checksum, caller should make sure all columns is ordered by the column's id.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (d *Decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
return nil, nil
}

event, err := buildRowChangedEvent(d.msg, tableInfo, d.config.EnableRowChecksum)
event, err := buildRowChangedEvent(d.msg, tableInfo, d.config.EnableRowChecksum, d.upstreamTiDB)
d.msg = nil

log.Debug("row changed event assembled", zap.Any("event", event))
Expand Down
Loading
Loading