Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): decode bytes-level checksum and encode columns-level checksum (#10706) #11380

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 174 additions & 33 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"encoding/json"
"fmt"
"math"
"reflect"
"sort"
"time"
"unsafe"
Expand All @@ -29,7 +30,6 @@
"github.com/pingcap/tidb/pkg/kv"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -100,11 +100,6 @@
// they should not be nil after decode at least one event in the row format v2.
decoder *rowcodec.DatumMapDecoder
preDecoder *rowcodec.DatumMapDecoder

// encoder is used to calculate the checksum.
encoder *rowcodec.Encoder
// sctx hold some information can be used by the encoder to calculate the checksum.
sctx *stmtctx.StatementContext
}

// NewMounter creates a mounter
Expand All @@ -124,9 +119,6 @@
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
integrity: integrity,

encoder: &rowcodec.Encoder{},
sctx: stmtctx.NewStmtCtxWithTimeZone(tz),
}
}

Expand Down Expand Up @@ -156,6 +148,8 @@
if !bytes.HasPrefix(raw.Key, tablePrefix) {
return nil, nil
}
// checksumKey is only used to calculate raw checksum if necessary.
checksumKey := raw.Key
key, physicalTableID, err := decodeTableID(raw.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -207,7 +201,7 @@
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -297,7 +291,6 @@
if err != nil {
return nil, false, errors.Trace(err)
}

datums, err = tablecodec.DecodeHandleToDatumMap(
recordID, handleColIDs, handleColFt, m.tz, datums)
if err != nil {
Expand Down Expand Up @@ -455,8 +448,8 @@
return cols, rawCols, columnInfos, nil
}

func (m *mounter) calculateChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
func calculateColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, tz *time.Location,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
Expand All @@ -475,37 +468,24 @@
Data: make([]byte, 0),
}

checksum, err := calculator.Checksum(m.tz)
checksum, err := calculator.Checksum(tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
}

// return error when calculate the checksum failed.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
func verifyColumnChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, decoder *rowcodec.DatumMapDecoder, tz *time.Location,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
}

// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
}

checksum, err := m.calculateChecksum(columnInfos, rawColumns)
checksum, err := calculateColumnChecksum(columnInfos, rawColumns, tz)

Check warning on line 488 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L488

Added line #L488 was not covered by tests
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, false, err
Expand Down Expand Up @@ -540,7 +520,168 @@
return checksum, false, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
// todo: do we really need this? how about the datum.ConvertTo ?
func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) {
if value == nil {
return types.NewDatum(nil), nil
}

Check warning on line 527 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L526-L527

Added lines #L526 - L527 were not covered by tests
switch ft.GetType() {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
switch v := value.(type) {
case uint64:
return types.NewUintDatum(v), nil
case int64:
return types.NewIntDatum(v), nil
}
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
// todo: DefaultStmtNoWarningContext timezone is set to UTC, is it correct?
t, err := types.ParseTime(types.DefaultStmtNoWarningContext, value.(string), ft.GetType(), ft.GetDecimal())
if err != nil {
return types.Datum{}, errors.Trace(err)
}

Check warning on line 541 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L540-L541

Added lines #L540 - L541 were not covered by tests
return types.NewTimeDatum(t), nil
case mysql.TypeDuration:
d, _, err := types.ParseDuration(types.StrictContext, value.(string), ft.GetDecimal())
if err != nil {
return types.Datum{}, errors.Trace(err)
}

Check warning on line 547 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L546-L547

Added lines #L546 - L547 were not covered by tests
return types.NewDurationDatum(d), nil
case mysql.TypeJSON:
bj, err := types.ParseBinaryJSONFromString(value.(string))
if err != nil {
return types.Datum{}, errors.Trace(err)
}

Check warning on line 553 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L552-L553

Added lines #L552 - L553 were not covered by tests
return types.NewJSONDatum(bj), nil
case mysql.TypeNewDecimal:
mysqlDecimal := new(types.MyDecimal)
err := mysqlDecimal.FromString([]byte(value.(string)))
if err != nil {
return types.Datum{}, errors.Trace(err)
}

Check warning on line 560 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L559-L560

Added lines #L559 - L560 were not covered by tests
datum := types.NewDecimalDatum(mysqlDecimal)
datum.SetLength(ft.GetFlen())
datum.SetFrac(ft.GetDecimal())
return datum, nil
case mysql.TypeEnum:
enum, err := types.ParseEnumValue(ft.GetElems(), value.(uint64))
if err != nil {
return types.Datum{}, errors.Trace(err)
}

Check warning on line 569 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L568-L569

Added lines #L568 - L569 were not covered by tests
return types.NewMysqlEnumDatum(enum), nil
case mysql.TypeSet:
set, err := types.ParseSetValue(ft.GetElems(), value.(uint64))
if err != nil {
return types.Datum{}, errors.Trace(err)
}

Check warning on line 575 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L574-L575

Added lines #L574 - L575 were not covered by tests
return types.NewMysqlSetDatum(set, ft.GetCollate()), nil
case mysql.TypeBit:
byteSize := (ft.GetFlen() + 7) >> 3
binaryLiteral := types.NewBinaryLiteralFromUint(value.(uint64), byteSize)
return types.NewMysqlBitDatum(binaryLiteral), nil
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
switch v := value.(type) {
case []byte:
return types.NewBytesDatum(v), nil
case string:
return types.NewBytesDatum([]byte(v)), nil

Check warning on line 587 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L586-L587

Added lines #L586 - L587 were not covered by tests
}
log.Panic("unknown data type when build datum",
zap.Any("type", ft.GetType()), zap.Any("value", value), zap.Reflect("type", reflect.TypeOf(value)))

Check warning on line 590 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L589-L590

Added lines #L589 - L590 were not covered by tests
case mysql.TypeFloat:
return types.NewFloat32Datum(value.(float32)), nil
case mysql.TypeDouble:
return types.NewFloat64Datum(value.(float64)), nil
default:
log.Panic("unexpected mysql type found", zap.Any("type", ft.GetType()))

Check warning on line 596 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L595-L596

Added lines #L595 - L596 were not covered by tests
}
return types.Datum{}, nil

Check warning on line 598 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L598

Added line #L598 was not covered by tests
}

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
return 0, true, nil
}

Check warning on line 608 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L607-L608

Added lines #L607 - L608 were not covered by tests
var (
columnIDs []int64
datums []*types.Datum
)
for _, col := range columns {
// TiDB does not encode null value into the bytes, so just ignore it.
if col.Value == nil {
continue
}
columnID := col.ColumnID
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
return 0, false, errors.Trace(err)
}

Check warning on line 623 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L622-L623

Added lines #L622 - L623 were not covered by tests
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
if err != nil {
return 0, false, errors.Trace(err)
}

Check warning on line 630 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L629-L630

Added lines #L629 - L630 were not covered by tests
if obtained == expected {
return expected, true, nil
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))

return expected, false, nil

Check warning on line 638 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L635-L638

Added lines #L635 - L638 were not covered by tests
}

// return error when calculate the checksum.
// return false if the checksum is not matched
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
if isPreRow {
decoder = m.preDecoder
} else {
decoder = m.decoder
}

version := decoder.ChecksumVersion()
switch version {
case 0:
return verifyColumnChecksum(columnInfos, rawColumns, decoder, m.tz)

Check warning on line 662 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L661-L662

Added lines #L661 - L662 were not covered by tests
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
if err != nil {
return 0, false, errors.Trace(err)
}

Check warning on line 667 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L666-L667

Added lines #L666 - L667 were not covered by tests
if !matched {
return expected, matched, err
}

Check warning on line 670 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L669-L670

Added lines #L669 - L670 were not covered by tests
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
return 0, false, errors.Trace(err)
}

Check warning on line 675 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L673-L675

Added lines #L673 - L675 were not covered by tests
return columnChecksum, true, nil
default:

Check warning on line 677 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L677

Added line #L677 was not covered by tests
}
return 0, false, errors.Errorf("unknown checksum version %d", version)

Check warning on line 679 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L679

Added line #L679 was not covered by tests
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
Expand Down Expand Up @@ -573,7 +714,7 @@
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
Expand Down Expand Up @@ -605,7 +746,7 @@
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
Expand Down
Loading
Loading