Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM: play with chunk #11539

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions cdc/entry/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package entry

import (
"bytes"
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

const (
RowTypeDelete = iota
RowTypeInsert
RowTypeUpdate
)

type TxnEvent struct {
TableInfo *model.TableInfo `json:"table_info"`
PhysicalTableID int64 `json:"physical_table_id"`
StartTs uint64 `json:"start_ts"`
CommitTs uint64 `json:"commit_ts"`
Rows *chunk.Chunk `json:"rows"`
RowType []int `json:"row_type"`
}

func rawKVToChunk(raw *model.RawKVEntry, tableInfo *model.TableInfo, tz *time.Location, rowCount int) *chunk.Chunk {
recordID, err := tablecodec.DecodeRowKey(raw.Key)
if err != nil {
log.Panic("decode row key failed", zap.Error(err))
}
if !bytes.HasPrefix(raw.Key, tablePrefix) {
return nil
}
// key, physicalTableID, err := decodeTableID(raw.Key)
// if err != nil {
// return nil
// }
if len(raw.OldValue) == 0 && len(raw.Value) == 0 {
return nil
}
handleColIDs, _, reqCols := tableInfo.GetRowColInfos()
// This function is used to set the default value for the column that
// is not in the raw data.
defVal := func(i int, chk *chunk.Chunk) error {
if reqCols[i].ID < 0 {
// model.ExtraHandleID, ExtraPidColID, ExtraPhysTblID... etc
// Don't set the default value for that column.
chk.AppendNull(i)
return nil
}
ci, ok := tableInfo.GetColumnInfo(reqCols[i].ID)
if !ok {
log.Panic("column not found", zap.Int64("columnID", reqCols[i].ID))
}

colDatum, _, _, warn, err := getDefaultOrZeroValue(ci, tz)
if err != nil {
return err
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()),
zap.String("column", ci.Name.String()))
}
chk.AppendDatum(i, &colDatum)
return nil
}
chunkDecoder := rowcodec.NewChunkDecoder(reqCols, handleColIDs, defVal, tz)
chk := chunk.NewChunkWithCapacity(tableInfo.GetFileSlice(), rowCount)
for i := 0; i < rowCount; i++ {
err = chunkDecoder.DecodeToChunk(raw.Value, recordID, chk)
}
if err != nil {
log.Panic("decode row failed", zap.Error(err))
}
return chk
}

type row struct {
Columns []*model.ColumnData `json:"columns"`
PreColumns []*model.ColumnData `json:"pre_columns"`
}

func (r row) String() string {
sb := strings.Builder{}
sb.WriteString("row: ")
for _, col := range r.Columns {
sb.WriteString(col.String())
}
return sb.String()
}

func chunkToRows(chk *chunk.Chunk, tableInfo *model.TableInfo) []row {
rows := make([]row, 0, chk.NumRows())
fieldSlice := tableInfo.GetFileSlice()
for i := 0; i < chk.NumRows(); i++ {
row := row{Columns: make([]*model.ColumnData, 0, chk.NumCols())}
for j := 0; j < chk.NumCols(); j++ {
col := chk.GetRow(i).GetDatum(j, fieldSlice[j])
rv := col.GetValue()
//v, _, _, _ := formatColVal(col, tableInfo.Columns[j])
row.Columns = append(row.Columns, &model.ColumnData{
ColumnID: tableInfo.Columns[j].ID,
Value: rv,
})
}
rows = append(rows, row)
}
return rows
}
117 changes: 117 additions & 0 deletions cdc/entry/chunk_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package entry

import (
"sync"
"sync/atomic"
"testing"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

type prepareData struct {
mu sync.Mutex
tableInfo *model.TableInfo
rawKv *model.RawKVEntry
isReady atomic.Bool
}

var pData = &prepareData{}

func prepare() {
replicaConfig := config.GetDefaultReplicaConfig()
t := &testing.T{}
helper := NewSchemaTestHelperWithReplicaConfig(t, replicaConfig)
defer helper.Close()

helper.Tk().MustExec("use test")
sql := `create table t (id int primary key, name varchar(255))`
_ = helper.DDL2Event(sql)
rawKV := helper.DML2RawKV(`insert into t (id, name) values (1, "jiangjiang")`, "test", "t")

tableInfo, _ := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t")
log.Info("fizz tableInfo", zap.Any("tableName", tableInfo.Name))
pData.mu.Lock()
defer pData.mu.Unlock()
pData.tableInfo = tableInfo
pData.rawKv = rawKV
pData.isReady.Store(true)
}

func getPData() *prepareData {
if !pData.isReady.Load() {
prepare()
}
return pData
}

func BenchmarkRawKV_To_Chunk_1RowTxn(b *testing.B) {
tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
if err != nil {
log.Panic("invalid timezone", zap.Error(err))
}
pData := getPData()
for i := 0; i < b.N; i++ {
_ = rawKVToChunk(pData.rawKv, pData.tableInfo, tz, 1)
}
}

func BenchmarkRawKV_To_RowKVEntry_1RowTxn(b *testing.B) {
pData := getPData()
base := baseKVEntry{}
m := mounter{}
for i := 0; i < b.N; i++ {
_, _ = m.unmarshalRowKVEntry(pData.tableInfo, pData.rawKv.Key, pData.rawKv.Value, nil, base)
}
}

func BenchmarkRawKV_To_Chunk_100RowTxn(b *testing.B) {
tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
if err != nil {
log.Panic("invalid timezone", zap.Error(err))
}
pData := getPData()
for i := 0; i < b.N; i++ {
_ = rawKVToChunk(pData.rawKv, pData.tableInfo, tz, 100)
}
}

func BenchmarkRawKV_To_RowKVEntry_100RowTxn(b *testing.B) {
pData := getPData()
base := baseKVEntry{}
m := mounter{}
for i := 0; i < b.N; i++ {
for j := 0; j < 100; j++ {
_, _ = m.unmarshalRowKVEntry(pData.tableInfo, pData.rawKv.Key, pData.rawKv.Value, nil, base)
}
}
}

func BenchmarkRawKV_To_Chunk_To_Value_100RowTxn(b *testing.B) {
tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
if err != nil {
log.Panic("invalid timezone", zap.Error(err))
}
pData := getPData()
for i := 0; i < b.N; i++ {
chk := rawKVToChunk(pData.rawKv, pData.tableInfo, tz, 100)
_ = chunkToRows(chk, pData.tableInfo)
}
}

func BenchmarkRawKV_To_RowKVEntry_To_Value_100RowTxn(b *testing.B) {
pData := getPData()
base := baseKVEntry{}
m := mounter{}
m.integrity = &integrity.Config{}
for i := 0; i < b.N; i++ {
for j := 0; j < 100; j++ {
row, _ := m.unmarshalRowKVEntry(pData.tableInfo, pData.rawKv.Key, pData.rawKv.Value, nil, base)
m.mountRowKVEntry(pData.tableInfo, row, pData.rawKv.Key, pData.rawKv.ApproximateDataSize())
}
}
}
77 changes: 77 additions & 0 deletions cdc/entry/chunk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package entry

import (
"testing"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestRawKVToChunk(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()

helper := NewSchemaTestHelperWithReplicaConfig(t, replicaConfig)
defer helper.Close()

helper.Tk().MustExec("set global tidb_enable_row_level_checksum = 1")
helper.Tk().MustExec("use test")

sql := `create table t (id int primary key, name varchar(255))`
_ = helper.DDL2Event(sql)
rawKV := helper.DML2RawKV(`insert into t (id, name) values (1, "jiangjiang")`, "test", "t")
require.NotNil(t, rawKV)

tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t")
require.True(t, ok)
require.NotNil(t, tableInfo)
tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
require.NoError(t, err)
start := time.Now()
chk := rawKVToChunk(rawKV, tableInfo, tz, 100)
require.NotNil(t, chk)
require.Equal(t, 2, chk.NumCols())
require.Equal(t, 100, chk.NumRows())

// for i := 0; i < 100; i++ {
// log.Info("fizz rawKv", zap.Any("rawKv", string(rawKV.Value)))
// info := tableInfo.Clone()
// _ = rawKVToChunk(rawKV, info, tz, 100)
// }
t.Logf("fizz time: %v", time.Since(start))
}

func TestChunkToRow(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()

helper := NewSchemaTestHelperWithReplicaConfig(t, replicaConfig)
defer helper.Close()

helper.Tk().MustExec("set global tidb_enable_row_level_checksum = 1")
helper.Tk().MustExec("use test")

sql := `create table t (id int primary key, name varchar(255))`
_ = helper.DDL2Event(sql)
rawKV := helper.DML2RawKV(`insert into t (id, name) values (1, "jiangjiang")`, "test", "t")
require.NotNil(t, rawKV)

tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t")
require.True(t, ok)
require.NotNil(t, tableInfo)
tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
require.NoError(t, err)
chk := rawKVToChunk(rawKV, tableInfo, tz, 100)
require.NotNil(t, chk)
require.Equal(t, 2, chk.NumCols())
require.Equal(t, 100, chk.NumRows())

rows := chunkToRows(chk, tableInfo)
require.Len(t, rows, 100)
for _, row := range rows {
require.Len(t, row.Columns, 2)
}
log.Info("fizz rows", zap.Stringer("10nd row", rows[10]))
}
1 change: 0 additions & 1 deletion cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ func (m *mounter) decodeRow(
datums map[int64]types.Datum
err error
)

if rowcodec.IsNewFormat(rawValue) {
decoder := rowcodec.NewDatumMapDecoder(reqCols, m.tz)
if isPreColumns {
Expand Down
18 changes: 18 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,24 @@ func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.Ro
return polymorphicEvent.Row
}

func (s *SchemaTestHelper) DML2RawKV(dml string, schema, table string) *model.RawKVEntry {
s.tk.MustExec(dml)
tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table)
require.True(s.t, ok)

key, value := s.getLastKeyValue(tableInfo.ID)
ts := s.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
OldValue: nil,
StartTs: ts - 1,
CRTs: ts + 1,
}
return rawKV
}

func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) {
txn, err := s.storage.Begin()
require.NoError(s.t, err)
Expand Down
10 changes: 8 additions & 2 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ type TableInfo struct {
// The following 3 fields, should only be used to decode datum from the raw value bytes, do not abuse those field.
// rowColInfos extend the model.ColumnInfo with some extra information
// it's the same length and order with the model.TableInfo.Columns
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType
rowColFieldTpsSlice []*types.FieldType
// only for new row format decoder
handleColID []int64

Expand Down Expand Up @@ -155,6 +156,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
VirtualGenCol: col.IsGenerated(),
}
ti.rowColFieldTps[col.ID] = ti.rowColInfos[i].Ft
ti.rowColFieldTpsSlice = append(ti.rowColFieldTpsSlice, ti.rowColInfos[i].Ft)
}

for _, idx := range ti.Indices {
Expand Down Expand Up @@ -366,6 +368,10 @@ func (ti *TableInfo) GetRowColInfos() ([]int64, map[int64]*types.FieldType, []ro
return ti.handleColID, ti.rowColFieldTps, ti.rowColInfos
}

func (ti *TableInfo) GetFileSlice() []*types.FieldType {
return ti.rowColFieldTpsSlice
}

// GetColInfosForRowChangedEvent return column infos for non-virtual columns
// The column order in the result is the same as the order in its corresponding RowChangedEvent
func (ti *TableInfo) GetColInfosForRowChangedEvent() []rowcodec.ColInfo {
Expand Down
4 changes: 4 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,10 @@ type ColumnData struct {
ApproximateBytes int `json:"-" msg:"-"`
}

func (c *ColumnData) String() string {
return fmt.Sprintf("ColumnID: %d, Value: %v", c.ColumnID, c.Value)
}

// RedoColumn stores Column change
type RedoColumn struct {
// Fields from Column and can't be marshaled directly in Column.
Expand Down
Loading