Skip to content

Commit

Permalink
simple (ticdc): support send all tables bootstrap message at changefe…
Browse files Browse the repository at this point in the history
…ed start (#11239)

close #11315
  • Loading branch information
asddongmen committed Jun 20, 2024
1 parent 2de4633 commit 58636ed
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 26 deletions.
9 changes: 9 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.SendBootstrapToAllPartition != nil {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*c.Sink.SendBootstrapToAllPartition)
}

if c.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*c.Sink.SendAllBootstrapAtStart)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -792,6 +796,10 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*cloned.Sink.SendBootstrapToAllPartition)
}

if cloned.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*cloned.Sink.SendAllBootstrapAtStart)
}

if cloned.Sink.DebeziumDisableSchema != nil {
res.Sink.DebeziumDisableSchema = util.AddressOf(*cloned.Sink.DebeziumDisableSchema)
}
Expand Down Expand Up @@ -957,6 +965,7 @@ type SinkConfig struct {
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"`
SendAllBootstrapAtStart *bool `json:"send-all-bootstrap-at-start,omitempty"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var defaultAPIConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true},
DebeziumConfig: &DebeziumConfig{OutputOldValue: true},
Expand Down
5 changes: 5 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,8 @@ func (s *SchemaTestHelper) Close() {
s.domain.Close()
s.storage.Close() //nolint:errcheck
}

// SchemaStorage returns the schema storage
func (s *SchemaTestHelper) SchemaStorage() SchemaStorage {
return s.schemaStorage
}
4 changes: 3 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,9 @@ LOOP2:
c.schema,
c.redoDDLMgr,
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode))
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(),
)

// create scheduler
cfg := *c.cfg
Expand Down
11 changes: 9 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type mockDDLSink struct {
// whether to record the DDL history, only for rename table
recordDDLHistory bool
// a slice of DDL history, only for rename table
ddlHistory []string
ddlHistory []*model.DDLEvent
mu struct {
sync.Mutex
checkpointTs model.Ts
Expand Down Expand Up @@ -117,7 +117,7 @@ func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
}
}()
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, ddl.Query)
m.ddlHistory = append(m.ddlHistory, ddl)
} else {
m.ddlHistory = nil
}
Expand Down Expand Up @@ -155,6 +155,13 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
return nil
}

type mockScheduler struct {
currentTables []model.TableID
}
Expand Down
76 changes: 62 additions & 14 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type ddlManager struct {

BDRMode bool
ddlResolvedTs model.Ts

shouldSendAllBootstrapAtStart bool
bootstraped bool
}

func newDDLManager(
Expand All @@ -143,6 +146,7 @@ func newDDLManager(
redoManager redo.DDLManager,
redoMetaManager redo.MetaManager,
bdrMode bool,
shouldSendAllBootstrapAtStart bool,
) *ddlManager {
log.Info("owner create ddl manager",
zap.String("namespace", changefeedID.Namespace),
Expand All @@ -152,19 +156,56 @@ func newDDLManager(
zap.Bool("bdrMode", bdrMode))

return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart,
}
}

func (m *ddlManager) checkAndSendBootstrapMsgs(ctx context.Context) (bool, error) {
if !m.shouldSendAllBootstrapAtStart || m.bootstraped {
return true, nil
}
start := time.Now()
defer func() {
log.Info("send bootstrap messages finished",
zap.Stringer("changefeed", m.changfeedID),
zap.Duration("cost", time.Since(start)))
}()
// Send bootstrap messages to downstream.
tableInfo, err := m.allTables(ctx)
if err != nil {
return false, errors.Trace(err)
}
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(tableInfo)))

for _, table := range tableInfo {
if table.TableInfo.IsView() {
continue
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
return false, errors.Trace(err)
}
}
m.bootstraped = true
return true, nil
}

// tick the ddlHandler, it does the following things:
Expand All @@ -183,6 +224,14 @@ func (m *ddlManager) tick(
m.justSentDDL = nil
m.checkpointTs = checkpointTs

ok, err := m.checkAndSendBootstrapMsgs(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
}
if !ok {
return nil, nil, nil
}

currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -483,8 +532,7 @@ func (m *ddlManager) barrier() *schedulepb.BarrierWithMinTs {
return barrier
}

// allTables returns all tables in the schema that
// less or equal than the checkpointTs.
// allTables returns all tables in the schema in current checkpointTs.
func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error) {
if m.tableInfoCache == nil {
ts := m.getSnapshotTs()
Expand Down
42 changes: 39 additions & 3 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
schema,
redo.NewDisabledDDLManager(),
redo.NewDisabledMetaManager(),
false)
false,
false,
)
return res
}

Expand Down Expand Up @@ -246,9 +248,9 @@ func TestExecRenameTablesDDL(t *testing.T) {
}
require.Len(t, mockDDLSink.ddlHistory, 2)
require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`",
mockDDLSink.ddlHistory[0])
mockDDLSink.ddlHistory[0].Query)
require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`",
mockDDLSink.ddlHistory[1])
mockDDLSink.ddlHistory[1].Query)

// mock all rename table statements have been done
mockDDLSink.resetDDLDone = false
Expand Down Expand Up @@ -459,3 +461,37 @@ func TestIsGlobalDDL(t *testing.T) {
require.Equal(t, c.ret, isGlobalDDL(c.ddl))
}
}

func TestCheckAndSendBootstrapMsgs(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ddl1 := helper.DDL2Event("create table test.tb1(id int primary key)")
ddl2 := helper.DDL2Event("create table test.tb2(id int primary key)")

ctx := context.Background()
dm := createDDLManagerForTest(t)
dm.schema = helper.SchemaStorage()
dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs

mockDDLSink := dm.ddlSink.(*mockDDLSink)
mockDDLSink.recordDDLHistory = true

// do not send all bootstrap messages
send, err := dm.checkAndSendBootstrapMsgs(ctx)
require.Nil(t, err)
require.True(t, send)
require.False(t, dm.bootstraped)
require.Equal(t, 0, len(mockDDLSink.ddlHistory))

// send all bootstrap messages -> tb1 and tb2
dm.shouldSendAllBootstrapAtStart = true
send, err = dm.checkAndSendBootstrapMsgs(ctx)
require.Nil(t, err)
require.True(t, send)
require.True(t, dm.bootstraped)
require.Equal(t, 2, len(mockDDLSink.ddlHistory))
require.True(t, mockDDLSink.ddlHistory[0].IsBootstrap)
require.True(t, mockDDLSink.ddlHistory[1].IsBootstrap)
require.Equal(t, ddl1.TableInfo.TableName, mockDDLSink.ddlHistory[0].TableInfo.TableName)
require.Equal(t, ddl2.TableInfo.TableName, mockDDLSink.ddlHistory[1].TableInfo.TableName)
}
14 changes: 10 additions & 4 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type DDLSink interface {
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
// emitBootstrap emits the table bootstrap event in a blocking way.
// It will return after the bootstrap event is sent.
emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
}
Expand Down Expand Up @@ -121,10 +124,6 @@ func ddlSinkInitializer(ctx context.Context, a *ddlSinkImpl) error {
return errors.Trace(err)
}
a.sink = s

if !util.GetOrZero(a.info.Config.EnableSyncPoint) {
return nil
}
return nil
}

Expand Down Expand Up @@ -472,3 +471,10 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {

return result, nil
}

func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if err := s.makeSinkReady(ctx); err != nil {
return errors.Trace(err)
}
return s.sink.WriteDDLEvent(ctx, bootstrap)
}
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down Expand Up @@ -253,6 +254,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -337,6 +338,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -511,6 +513,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var defaultReplicaConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount),
SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition),
SendAllBootstrapAtStart: util.AddressOf(DefaultSendAllBootstrapAtStart),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &OpenProtocolConfig{OutputOldValue: true},
Debezium: &DebeziumConfig{OutputOldValue: true},
Expand Down
16 changes: 15 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ const (
// DefaultSendBootstrapToAllPartition is the default value of
// whether to send bootstrap message to all partitions.
DefaultSendBootstrapToAllPartition = true
// DefaultSendAllBootstrapAtStart is the default value of whether
// to send all tables bootstrap message at changefeed start.
DefaultSendAllBootstrapAtStart = false

// DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker.
// The pulsar client uses an exponential backoff with jitter to reconnect to the broker.
Expand Down Expand Up @@ -188,7 +191,8 @@ type SinkConfig struct {
// If set to false, bootstrap message will only be sent to the first partition of each topic.
// Default value is true.
SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"`

// SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.
SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"`
// Debezium only. Whether schema should be excluded in the output.
DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"`

Expand Down Expand Up @@ -227,6 +231,16 @@ func (s *SinkConfig) ShouldSendBootstrapMsg() bool {
util.GetOrZero(s.SendBootstrapInMsgCount) > 0
}

// ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start.
func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool {
if s == nil {
return false
}
should := s.ShouldSendBootstrapMsg() && util.GetOrZero(s.SendAllBootstrapAtStart)
log.Info("should send all bootstrap at start", zap.Bool("should", should))
return should
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields, it can be 1 character or at most 2 characters
Expand Down
Loading

0 comments on commit 58636ed

Please sign in to comment.