Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 20, 2024
1 parent 7c71805 commit 6be470b
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 4 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ check_third_party_binary:
@which bin/sync_diff_inspector
@which bin/go-ycsb
@which bin/etcdctl
@which bin/jq
@which bin/minio
@which bin/bin/schema-registry-start
# @which bin/jq
# @which bin/minio
# @which bin/bin/schema-registry-start

integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer pulsar_consumer oauth2_server
$(FAILPOINT_ENABLE)
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,7 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven
}

sum32 := r.hasher.Sum32()
log.Debug("dispatch row changed event", zap.String("table", row.TableInfo.GetTableName()),
zap.Int32("partitionNum", partitionNum), zap.Uint32("sum32", sum32))
return int32(sum32 % uint32(partitionNum)), strconv.FormatInt(int64(sum32), 10), nil
}
5 changes: 4 additions & 1 deletion cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ func (s *dmlSink) writeEvent(txn *dmlsink.CallbackableEvent[*model.SingleTableTx

// Note: Calculate the partition index after the transformer is applied.
// Because the transformer may change the row of the event.
index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
index, key, err := partitionDispatcher.DispatchRowChangedEvent(row, partitionNum)
log.Debug("get partition for row", zap.String("topic", topic),
zap.Int32("partition", index), zap.String("key", key),
zap.Any("row", row))
if err != nil {
return errors.Trace(err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[filter]
rules = ['test.*']

[sink]
dispatchers = [
{matcher = ['test.*'], partition = "index-value"},
]

[sink.kafka-config]
output-raw-change-event = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/mq_split_by_partition_key/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["test.?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
13 changes: 13 additions & 0 deletions tests/integration_tests/mq_split_by_partition_key/data/data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
drop database if exists test;
create database test;
use test;

CREATE TABLE t (a INT PRIMARY KEY NONCLUSTERED, b INT);

INSERT INTO t VALUES (1, 2);
UPDATE t SET a = 2 WHERE a = 1;

INSERT INTO t VALUES (1, 3);
UPDATE t SET a="3" WHERE a="1";

CREATE TABLE test.finish_mark (a int primary key);
64 changes: 64 additions & 0 deletions tests/integration_tests/mq_split_by_partition_key/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

# use kafka-consumer with canal-json decoder to sync data from kafka to mysql
function run() {
if [ "$SINK_TYPE" != "kafka" ] && [ "$SINK_TYPE" != "pulsar" ]; then
return
fi

# clean up environment
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

# start tidb cluster
start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

TOPIC_NAME="ticdc-mq-split-by-partition-key"

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

if [ "$SINK_TYPE" == "kafka" ]; then
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true&partition-num=6"
fi

if [ "$SINK_TYPE" == "pulsar" ]; then
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true&partition-num=6"
fi

start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
changefeed_id="changefeed-kafka"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id"
sleep 5 # wait for changefeed to start
# determine the sink uri and run corresponding consumer
# currently only kafka and pulsar are supported
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR $SINK_URI
fi

if [ "$SINK_TYPE" == "pulsar" ]; then
run_pulsar_consumer --upstream-uri $SINK_URI
fi

run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 200
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 6be470b

Please sign in to comment.