Skip to content

Commit

Permalink
add para and test
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 18, 2024
1 parent a0cd9f4 commit f0b10f0
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 18 deletions.
6 changes: 6 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
AuthTLSCertificatePath: c.Sink.PulsarConfig.AuthTLSCertificatePath,
AuthTLSPrivateKeyPath: c.Sink.PulsarConfig.AuthTLSPrivateKeyPath,
OutputRawChangeEvent: c.Sink.PulsarConfig.OutputRawChangeEvent,
SplitByPartitionKey: c.Sink.PulsarConfig.SplitByPartitionKey,

Check warning on line 321 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L321

Added line #L321 was not covered by tests
}
if c.Sink.PulsarConfig.OAuth2 != nil {
pulsarConfig.OAuth2 = &config.OAuth2{
Expand Down Expand Up @@ -404,6 +405,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent,
SplitByPartitionKey: c.Sink.KafkaConfig.SplitByPartitionKey,

Check warning on line 408 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L408

Added line #L408 was not covered by tests
}
}
var mysqlConfig *config.MySQLConfig
Expand Down Expand Up @@ -670,6 +672,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent,
SplitByPartitionKey: cloned.Sink.KafkaConfig.SplitByPartitionKey,

Check warning on line 675 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L675

Added line #L675 was not covered by tests
}
}
var mysqlConfig *MySQLConfig
Expand Down Expand Up @@ -713,6 +716,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
AuthTLSCertificatePath: cloned.Sink.PulsarConfig.AuthTLSCertificatePath,
AuthTLSPrivateKeyPath: cloned.Sink.PulsarConfig.AuthTLSPrivateKeyPath,
OutputRawChangeEvent: cloned.Sink.PulsarConfig.OutputRawChangeEvent,
SplitByPartitionKey: cloned.Sink.PulsarConfig.SplitByPartitionKey,

Check warning on line 719 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L719

Added line #L719 was not covered by tests
}
if cloned.Sink.PulsarConfig.OAuth2 != nil {
pulsarConfig.OAuth2 = &PulsarOAuth2{
Expand Down Expand Up @@ -1201,6 +1205,7 @@ type PulsarConfig struct {
AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"`
OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"`
OutputRawChangeEvent *bool `json:"output-raw-change-event,omitempty"`
SplitByPartitionKey *bool `json:"split-by-partition-key,omitempty"`
}

// PulsarOAuth2 is the configuration for OAuth2
Expand Down Expand Up @@ -1251,6 +1256,7 @@ type KafkaConfig struct {
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
SplitByPartitionKey *bool `json:"split_by_partition_key,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func NewKafkaDMLSink(
metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient)
dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, eventRouter, trans, encoderGroup,
protocol, scheme, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, eventRouter, trans, encoderGroup, protocol, scheme,
replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), replicaConfig.Sink.KafkaConfig.GetSplitByPartitionKey(), errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID))
Expand Down
36 changes: 21 additions & 15 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type dmlSink struct {

scheme string
outputRawChangeEvent bool
splitByPartitionKey bool
}

func newDMLSink(
Expand All @@ -86,6 +87,7 @@ func newDMLSink(
protocol config.Protocol,
scheme string,
outputRawChangeEvent bool,
splitByPartitionKey bool,
errCh chan error,
) *dmlSink {
ctx, cancel := context.WithCancelCause(ctx)
Expand All @@ -101,6 +103,7 @@ func newDMLSink(
dead: make(chan struct{}),
scheme: scheme,
outputRawChangeEvent: outputRawChangeEvent,
splitByPartitionKey: splitByPartitionKey,
}
s.alive.transformer = transformer
s.alive.eventRouter = eventRouter
Expand Down Expand Up @@ -179,23 +182,26 @@ func (s *dmlSink) writeEvent(txn *dmlsink.CallbackableEvent[*model.SingleTableTx
tableInfo := txn.Event.Rows[0].TableInfo
partitionDispatcher := s.alive.eventRouter.GetPartitionDispatcher(tableInfo.TableName.Schema, tableInfo.TableName.Table)

// Split the update event to delete and insert event if the partition key is updated.
rows := make([]*model.RowChangedEvent, 0, len(txn.Event.Rows))
for _, row := range txn.Event.Rows {
changed, err := partitionDispatcher.IsPartitionKeyUpdated(row)
if err != nil {
return errors.Trace(err)
}
if !changed {
rows = append(rows, row)
continue
}
rows := txn.Event.Rows
if s.splitByPartitionKey {
// Split the update event to delete and insert event if the partition key is updated.
rows = make([]*model.RowChangedEvent, 0, len(txn.Event.Rows))
for _, row := range txn.Event.Rows {
changed, err := partitionDispatcher.IsPartitionKeyUpdated(row)
if err != nil {
return errors.Trace(err)
}
if !changed {
rows = append(rows, row)
continue
}

deleteEvent, insertEvent, err := model.SplitUpdateEvent(row)
if err != nil {
return errors.Trace(err)
deleteEvent, insertEvent, err := model.SplitUpdateEvent(row)
if err != nil {
return errors.Trace(err)
}
rows = append(rows, deleteEvent, insertEvent)
}
rows = append(rows, deleteEvent, insertEvent)
}

rowCallback := toRowCallback(txn.Callback, uint64(len(rows)))
Expand Down
125 changes: 125 additions & 0 deletions cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -112,3 +113,127 @@ func TestWriteEvents(t *testing.T) {
require.Len(t, errCh, 0)
require.Len(t, s.alive.worker.producer.(*dmlproducer.MockDMLProducer).GetAllEvents(), 3000)
}

func TestWriteEventsSplitByPartitionKey(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

sinkURI, err := url.Parse(uri)
require.NoError(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{
SplitByPartitionKey: util.AddressOf(true),
}

cases := []struct {
rule *config.DispatchRule
res int
}{
{
rule: &config.DispatchRule{
Matcher: []string{"test.t"},
PartitionRule: "default",
},
res: 3002,
}, {
rule: &config.DispatchRule{
Matcher: []string{"test.t"},
PartitionRule: "columns",
Columns: []string{"a"},
},
res: 3003,
}, {
rule: &config.DispatchRule{
Matcher: []string{"test.t"},
PartitionRule: "index-value",
},
res: 3003,
}, {
rule: &config.DispatchRule{
Matcher: []string{"test.t"},
PartitionRule: "ts",
},
res: 3002,
}, {
rule: &config.DispatchRule{
Matcher: []string{"test.t"},
PartitionRule: "table",
},
res: 3002,
},
}

for _, testCase := range cases {
testName := testCase.rule.PartitionRule
replicaConfig.Sink.DispatchRules = []*config.DispatchRule{testCase.rule}
require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI))
errCh := make(chan error, 1)

ctx = context.WithValue(ctx, "testing.T", t)
changefeedID := model.DefaultChangeFeedID("test")
s, err := NewKafkaDMLSink(ctx, changefeedID, sinkURI, replicaConfig, errCh,
kafka.NewMockFactory, dmlproducer.NewDMLMockProducer)
require.NoError(t, err)
require.NotNil(t, s)
defer s.Close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

tableStatus := state.TableSinkSinking
row := &model.RowChangedEvent{
CommitTs: 1,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{{Name: "a", Value: "aa"}}, tableInfo),
}

// Insert
events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
for i := 0; i < 3000; i++ {
events = append(events, &dmlsink.TxnCallbackableEvent{
Event: &model.SingleTableTxn{
Rows: []*model.RowChangedEvent{row},
},
Callback: func() {},
SinkState: &tableStatus,
})
}
// Update
events = append(events, &dmlsink.TxnCallbackableEvent{
Event: &model.SingleTableTxn{
Rows: []*model.RowChangedEvent{
{ // case 1: partition key not updated
CommitTs: 1,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{{Name: "a", Value: "aa"}}, tableInfo),
PreColumns: model.Columns2ColumnDatas([]*model.Column{{Name: "a", Value: "aa"}}, tableInfo),
},
{ // case 2: partition key updated
CommitTs: 1,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{{Name: "a", Value: "bb"}}, tableInfo),
PreColumns: model.Columns2ColumnDatas([]*model.Column{{Name: "a", Value: "aa"}}, tableInfo),
},
},
},
Callback: func() {},
SinkState: &tableStatus,
})

err = s.WriteEvents(events...)
// Wait for the events to be received by the worker.
time.Sleep(time.Second)
require.NoError(t, err)
require.Len(t, errCh, 0)
require.Equal(t, testCase.res, len(s.alive.worker.producer.(*dmlproducer.MockDMLProducer).GetAllEvents()), testName)
}
}
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewPulsarDMLSink(
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID)

s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, trans, encoderGroup,
protocol, scheme, pConfig.GetOutputRawChangeEvent(), errCh)
protocol, scheme, pConfig.GetOutputRawChangeEvent(), pConfig.GetSplitByPartitionKey(), errCh)

Check warning on line 127 in cdc/sink/dmlsink/mq/pulsar_dml_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/mq/pulsar_dml_sink.go#L127

Added line #L127 was not covered by tests

return s, nil
}
19 changes: 19 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ type KafkaConfig struct {

// OutputRawChangeEvent controls whether to split the update pk/uk events.
OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"`
SplitByPartitionKey *bool `toml:"split-by-partition-key" json:"split-by-partition-key,omitempty"`
}

// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent
Expand All @@ -446,6 +447,14 @@ func (k *KafkaConfig) GetOutputRawChangeEvent() bool {
return *k.OutputRawChangeEvent
}

// GetSplitByPartitionKey returns the value of SplitByPartitionKey
func (k *KafkaConfig) GetSplitByPartitionKey() bool {
if k == nil || k.SplitByPartitionKey == nil {
return false
}
return *k.SplitByPartitionKey

Check warning on line 455 in pkg/config/sink.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/sink.go#L451-L455

Added lines #L451 - L455 were not covered by tests
}

// MaskSensitiveData masks sensitive data in KafkaConfig
func (k *KafkaConfig) MaskSensitiveData() {
k.SASLPassword = aws.String("******")
Expand Down Expand Up @@ -600,6 +609,8 @@ type PulsarConfig struct {

// OutputRawChangeEvent controls whether to split the update pk/uk events.
OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"`
// SplitByPartitionKey controls whether to split the events by partition key.
SplitByPartitionKey *bool `toml:"split-by-partition-key" json:"split-by-partition-key,omitempty"`

// BrokerURL is used to configure service brokerUrl for the Pulsar service.
// This parameter is a part of the `sink-uri`. Internal use only.
Expand All @@ -616,6 +627,14 @@ func (c *PulsarConfig) GetOutputRawChangeEvent() bool {
return *c.OutputRawChangeEvent
}

// GetSplitByPartitionKey returns the value of SplitByPartitionKey
func (c *PulsarConfig) GetSplitByPartitionKey() bool {
if c == nil || c.SplitByPartitionKey == nil {
return false
}
return *c.SplitByPartitionKey

Check warning on line 635 in pkg/config/sink.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/sink.go#L631-L635

Added lines #L631 - L635 were not covered by tests
}

// MaskSensitiveData masks sensitive data in PulsarConfig
func (c *PulsarConfig) MaskSensitiveData() {
if c.AuthenticationToken != nil {
Expand Down

0 comments on commit f0b10f0

Please sign in to comment.