From 6be470b04153c3209b666353d825ca09f6b1f425 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Thu, 20 Jun 2024 19:04:44 +0800 Subject: [PATCH] fix --- Makefile | 6 +- .../mq/dispatcher/partition/index_value.go | 2 + cdc/sink/dmlsink/mq/mq_dml_sink.go | 5 +- .../conf/changefeed-kafka.toml | 10 +++ .../conf/diff_config.toml | 29 +++++++++ .../mq_split_by_partition_key/data/data.sql | 13 ++++ .../mq_split_by_partition_key/run.sh | 64 +++++++++++++++++++ 7 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 tests/integration_tests/mq_split_by_partition_key/conf/changefeed-kafka.toml create mode 100644 tests/integration_tests/mq_split_by_partition_key/conf/diff_config.toml create mode 100644 tests/integration_tests/mq_split_by_partition_key/data/data.sql create mode 100644 tests/integration_tests/mq_split_by_partition_key/run.sh diff --git a/Makefile b/Makefile index f2402e625dd..82c2b1d8fe0 100644 --- a/Makefile +++ b/Makefile @@ -234,9 +234,9 @@ check_third_party_binary: @which bin/sync_diff_inspector @which bin/go-ycsb @which bin/etcdctl - @which bin/jq - @which bin/minio - @which bin/bin/schema-registry-start + # @which bin/jq + # @which bin/minio + # @which bin/bin/schema-registry-start integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer pulsar_consumer oauth2_server $(FAILPOINT_ENABLE) diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 65b21350f96..758d0af6ccd 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -130,5 +130,7 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven } sum32 := r.hasher.Sum32() + log.Debug("dispatch row changed event", zap.String("table", row.TableInfo.GetTableName()), + zap.Int32("partitionNum", partitionNum), zap.Uint32("sum32", sum32)) return int32(sum32 % uint32(partitionNum)), strconv.FormatInt(int64(sum32), 10), nil } diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 7ace2f29b0e..a6055d491c6 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -213,7 +213,10 @@ func (s *dmlSink) writeEvent(txn *dmlsink.CallbackableEvent[*model.SingleTableTx // Note: Calculate the partition index after the transformer is applied. // Because the transformer may change the row of the event. - index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) + index, key, err := partitionDispatcher.DispatchRowChangedEvent(row, partitionNum) + log.Debug("get partition for row", zap.String("topic", topic), + zap.Int32("partition", index), zap.String("key", key), + zap.Any("row", row)) if err != nil { return errors.Trace(err) } diff --git a/tests/integration_tests/mq_split_by_partition_key/conf/changefeed-kafka.toml b/tests/integration_tests/mq_split_by_partition_key/conf/changefeed-kafka.toml new file mode 100644 index 00000000000..4fa2437a488 --- /dev/null +++ b/tests/integration_tests/mq_split_by_partition_key/conf/changefeed-kafka.toml @@ -0,0 +1,10 @@ +[filter] +rules = ['test.*'] + +[sink] +dispatchers = [ + {matcher = ['test.*'], partition = "index-value"}, +] + +[sink.kafka-config] +output-raw-change-event = true diff --git a/tests/integration_tests/mq_split_by_partition_key/conf/diff_config.toml b/tests/integration_tests/mq_split_by_partition_key/conf/diff_config.toml new file mode 100644 index 00000000000..0d0dfea5517 --- /dev/null +++ b/tests/integration_tests/mq_split_by_partition_key/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/mq_split_by_partition_key/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/mq_split_by_partition_key/data/data.sql b/tests/integration_tests/mq_split_by_partition_key/data/data.sql new file mode 100644 index 00000000000..79e2d8a430d --- /dev/null +++ b/tests/integration_tests/mq_split_by_partition_key/data/data.sql @@ -0,0 +1,13 @@ +drop database if exists test; +create database test; +use test; + +CREATE TABLE t (a INT PRIMARY KEY NONCLUSTERED, b INT); + +INSERT INTO t VALUES (1, 2); +UPDATE t SET a = 2 WHERE a = 1; + +INSERT INTO t VALUES (1, 3); +UPDATE t SET a="3" WHERE a="1"; + +CREATE TABLE test.finish_mark (a int primary key); \ No newline at end of file diff --git a/tests/integration_tests/mq_split_by_partition_key/run.sh b/tests/integration_tests/mq_split_by_partition_key/run.sh new file mode 100644 index 00000000000..f9ac4878031 --- /dev/null +++ b/tests/integration_tests/mq_split_by_partition_key/run.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# use kafka-consumer with canal-json decoder to sync data from kafka to mysql +function run() { + if [ "$SINK_TYPE" != "kafka" ] && [ "$SINK_TYPE" != "pulsar" ]; then + return + fi + + # clean up environment + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + # start tidb cluster + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + TOPIC_NAME="ticdc-mq-split-by-partition-key" + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + if [ "$SINK_TYPE" == "kafka" ]; then + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true&partition-num=6" + fi + + if [ "$SINK_TYPE" == "pulsar" ]; then + run_pulsar_cluster $WORK_DIR normal + SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true&partition-num=6" + fi + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + changefeed_id="changefeed-kafka" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + sleep 5 # wait for changefeed to start + # determine the sink uri and run corresponding consumer + # currently only kafka and pulsar are supported + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR $SINK_URI + fi + + if [ "$SINK_TYPE" == "pulsar" ]; then + run_pulsar_consumer --upstream-uri $SINK_URI + fi + + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 200 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"