diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 6210415435e..1713618314b 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -429,6 +429,10 @@ func (c *changefeed) tick(ctx context.Context, if err != nil { return 0, 0, errors.Trace(err) } + // bootstrap not finished yet, cannot send any event. + if !c.ddlManager.isBootstrapped() { + return 0, 0, nil + } err = c.handleBarrier(ctx, cfInfo, cfStatus, barrier) if err != nil { @@ -718,6 +722,7 @@ LOOP2: c.redoMetaMgr, util.GetOrZero(cfInfo.Config.BDRMode), cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(), + c.Throw(ctx), ) // create scheduler diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 29b70772d8c..ed85470d5a2 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -98,6 +98,8 @@ type mockDDLSink struct { syncPoint model.Ts syncPointHis []model.Ts + bootstrapError bool + wg sync.WaitGroup } @@ -146,16 +148,15 @@ func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo return m.mu.checkpointTs, m.mu.currentTables } -func (m *mockDDLSink) close(ctx context.Context) error { +func (m *mockDDLSink) close(_ context.Context) error { m.wg.Wait() return nil } -func (m *mockDDLSink) Barrier(ctx context.Context) error { - return nil -} - -func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error { +func (m *mockDDLSink) emitBootstrap(_ context.Context, bootstrap *model.DDLEvent) error { + if m.bootstrapError { + return errors.New("emit bootstrap error") + } if m.recordDDLHistory { m.ddlHistory = append(m.ddlHistory, bootstrap) } @@ -196,8 +197,28 @@ func (m *mockScheduler) DrainCapture(target model.CaptureID) (int, error) { // Close closes the scheduler and releases resources. func (m *mockScheduler) Close(ctx context.Context) {} +func newMockDDLSink(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink { + return &mockDDLSink{ + resetDDLDone: true, + recordDDLHistory: false, + } +} + +func newMockDDLSinkWithBootstrapError(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink { + return &mockDDLSink{ + resetDDLDone: true, + recordDDLHistory: false, + bootstrapError: true, + } +} + +func newMockPuller(_ context.Context, _ *upstream.Upstream, startTs uint64, _ model.ChangeFeedID, schemaStorage entry.SchemaStorage, _ filter.Filter) puller.DDLPuller { + return &mockDDLPuller{resolvedTs: startTs, schemaStorage: schemaStorage} +} + func createChangefeed4Test(globalVars *vars.GlobalVars, changefeedInfo *model.ChangeFeedInfo, + newMockDDLSink func(model.ChangeFeedID, *model.ChangeFeedInfo, func(error), func(error)) DDLSink, t *testing.T, ) ( *changefeed, map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester, *orchestrator.ChangefeedReactorState, @@ -220,22 +241,9 @@ func createChangefeed4Test(globalVars *vars.GlobalVars, cf := newChangefeed4Test(model.DefaultChangeFeedID(changefeedInfo.ID), state.Info, state.Status, NewFeedStateManager(up, state), up, // new ddl puller - func(ctx context.Context, - up *upstream.Upstream, - startTs uint64, - changefeed model.ChangeFeedID, - schemaStorage entry.SchemaStorage, - filter filter.Filter, - ) puller.DDLPuller { - return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage} - }, + newMockPuller, // new ddl ddlSink - func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink { - return &mockDDLSink{ - resetDDLDone: true, - recordDDLHistory: false, - } - }, + newMockDDLSink, // new scheduler func( ctx context.Context, id model.ChangeFeedID, up *upstream.Upstream, epoch uint64, @@ -267,7 +275,7 @@ func createChangefeed4Test(globalVars *vars.GlobalVars, func TestPreCheck(t *testing.T) { globalvars, changefeedVars := vars.NewGlobalVarsAndChangefeedInfo4Test() - _, captures, tester, state := createChangefeed4Test(globalvars, changefeedVars, t) + _, captures, tester, state := createChangefeed4Test(globalvars, changefeedVars, newMockDDLSink, t) state.CheckCaptureAlive(globalvars.CaptureInfo.ID) preflightCheck(state, captures) tester.MustApplyPatches() @@ -290,7 +298,7 @@ func TestPreCheck(t *testing.T) { func TestInitialize(t *testing.T) { globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() ctx := context.Background() - cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t) defer cf.Close(ctx) // pre check state.CheckCaptureAlive(globalvars.CaptureInfo.ID) @@ -307,7 +315,7 @@ func TestInitialize(t *testing.T) { func TestChangefeedHandleError(t *testing.T) { globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() ctx := context.Background() - cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t) defer cf.Close(ctx) // pre check state.CheckCaptureAlive(globalvars.CaptureInfo.ID) @@ -326,6 +334,38 @@ func TestChangefeedHandleError(t *testing.T) { require.Equal(t, state.Info.Error.Message, "fake error") } +func TestTrySendBootstrapMeetError(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + _ = helper.DDL2Event("create table test.t(id int primary key, b int)") + + ctx := context.Background() + globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() + cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSinkWithBootstrapError, t) + cf.upstream.KVStorage = helper.Storage() + defer cf.Close(ctx) + + // pre check + state.CheckCaptureAlive(globalVars.CaptureInfo.ID) + require.False(t, preflightCheck(state, captures)) + tester.MustApplyPatches() + + // initialize + state.Info.Config.Sink.Protocol = util.AddressOf("simple") + state.Info.Config.Sink.SendAllBootstrapAtStart = util.AddressOf(true) + cf.Tick(ctx, state.Info, state.Status, captures) + tester.MustApplyPatches() + + require.Eventually(t, func() bool { + cf.Tick(ctx, state.Info, state.Status, captures) + tester.MustApplyPatches() + if state.Info.Error != nil { + return state.Info.State == model.StatePending + } + return false + }, 5*time.Second, 100*time.Millisecond) +} + func TestExecDDL(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() @@ -338,7 +378,7 @@ func TestExecDDL(t *testing.T) { globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() changefeedInfo.StartTs = startTs ctx := context.Background() - cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t) cf.upstream.KVStorage = helper.Storage() defer cf.Close(ctx) tickTwoTime := func() { @@ -423,7 +463,7 @@ func TestEmitCheckpointTs(t *testing.T) { globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() changefeedInfo.StartTs = startTs ctx := context.Background() - cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t) cf.upstream.KVStorage = helper.Storage() defer cf.Close(ctx) @@ -490,7 +530,7 @@ func TestSyncPoint(t *testing.T) { changefeedInfo.Config.SyncPointInterval = util.AddressOf(1 * time.Second) // SyncPoint option is only available for MySQL compatible database. changefeedInfo.SinkURI = "mysql://" - cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t) defer cf.Close(ctx) // pre check @@ -523,7 +563,7 @@ func TestFinished(t *testing.T) { globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() ctx := context.Background() changefeedInfo.TargetTs = changefeedInfo.StartTs + 1000 - cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t) defer cf.Close(ctx) // pre check @@ -595,7 +635,7 @@ func testChangefeedReleaseResource( expectedInitialized bool, ) { var err error - cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t) // pre check state.CheckCaptureAlive(globalVars.CaptureInfo.ID) @@ -646,7 +686,7 @@ func TestBarrierAdvance(t *testing.T) { } changefeedInfo.SinkURI = "mysql://" - cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, t) + cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t) defer cf.Close(ctx) // The changefeed load the info from etcd. diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index ee81a4adfdb..853b5cee8ae 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -17,6 +17,7 @@ import ( "context" "math/rand" "sort" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -131,8 +132,24 @@ type ddlManager struct { BDRMode bool ddlResolvedTs model.Ts - shouldSendAllBootstrapAtStart bool - bootstraped bool + bootstrapState bootstrapState + reportError func(err error) +} + +type bootstrapState int32 + +const ( + bootstrapNotStarted bootstrapState = iota + bootstrapInProgress + bootstrapFinished +) + +func storeBootstrapState(addr *bootstrapState, state bootstrapState) { + atomic.StoreInt32((*int32)(addr), int32(state)) +} + +func loadBootstrapState(addr *bootstrapState) bootstrapState { + return bootstrapState(atomic.LoadInt32((*int32)(addr))) } func newDDLManager( @@ -147,6 +164,7 @@ func newDDLManager( redoMetaManager redo.MetaManager, bdrMode bool, shouldSendAllBootstrapAtStart bool, + reportError func(err error), ) *ddlManager { log.Info("owner create ddl manager", zap.String("namespace", changefeedID.Namespace), @@ -155,57 +173,76 @@ func newDDLManager( zap.Uint64("checkpointTs", checkpointTs), zap.Bool("bdrMode", bdrMode)) + bootstrap := bootstrapFinished + if shouldSendAllBootstrapAtStart { + bootstrap = bootstrapNotStarted + } + return &ddlManager{ - changfeedID: changefeedID, - ddlSink: ddlSink, - filter: filter, - ddlPuller: ddlPuller, - schema: schema, - redoDDLManager: redoManager, - redoMetaManager: redoMetaManager, - startTs: startTs, - checkpointTs: checkpointTs, - ddlResolvedTs: startTs, - BDRMode: bdrMode, - pendingDDLs: make(map[model.TableName][]*model.DDLEvent), - shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart, + changfeedID: changefeedID, + ddlSink: ddlSink, + filter: filter, + ddlPuller: ddlPuller, + schema: schema, + redoDDLManager: redoManager, + redoMetaManager: redoMetaManager, + startTs: startTs, + checkpointTs: checkpointTs, + ddlResolvedTs: startTs, + BDRMode: bdrMode, + pendingDDLs: make(map[model.TableName][]*model.DDLEvent), + bootstrapState: bootstrap, + reportError: reportError, } } -func (m *ddlManager) checkAndSendBootstrapMsgs(ctx context.Context) (bool, error) { - if !m.shouldSendAllBootstrapAtStart || m.bootstraped { - return true, nil - } +func (m *ddlManager) isBootstrapped() bool { + return loadBootstrapState(&m.bootstrapState) == bootstrapFinished +} + +// return true if bootstrapped +func (m *ddlManager) trySendBootstrap(ctx context.Context, currentTables []*model.TableInfo) bool { + bootstrap := loadBootstrapState(&m.bootstrapState) + switch bootstrap { + case bootstrapFinished: + return true + case bootstrapInProgress: + return false + case bootstrapNotStarted: + } + storeBootstrapState(&m.bootstrapState, bootstrapInProgress) start := time.Now() - defer func() { + go func() { + log.Info("start to send bootstrap messages", + zap.Stringer("changefeed", m.changfeedID), + zap.Int("tables", len(currentTables))) + for idx, table := range currentTables { + if table.TableInfo.IsView() { + continue + } + ddlEvent := &model.DDLEvent{ + TableInfo: table, + IsBootstrap: true, + } + err := m.ddlSink.emitBootstrap(ctx, ddlEvent) + if err != nil { + log.Error("send bootstrap message failed", + zap.Stringer("changefeed", m.changfeedID), + zap.Int("tables", len(currentTables)), + zap.Int("emitted", idx+1), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + m.reportError(err) + return + } + } + storeBootstrapState(&m.bootstrapState, bootstrapFinished) log.Info("send bootstrap messages finished", zap.Stringer("changefeed", m.changfeedID), + zap.Int("tables", len(currentTables)), zap.Duration("cost", time.Since(start))) }() - // Send bootstrap messages to downstream. - tableInfo, err := m.allTables(ctx) - if err != nil { - return false, errors.Trace(err) - } - log.Info("start to send bootstrap messages", - zap.Stringer("changefeed", m.changfeedID), - zap.Int("tables", len(tableInfo))) - - for _, table := range tableInfo { - if table.TableInfo.IsView() { - continue - } - ddlEvent := &model.DDLEvent{ - TableInfo: table, - IsBootstrap: true, - } - err := m.ddlSink.emitBootstrap(ctx, ddlEvent) - if err != nil { - return false, errors.Trace(err) - } - } - m.bootstraped = true - return true, nil + return m.isBootstrapped() } // tick the ddlHandler, it does the following things: @@ -224,19 +261,17 @@ func (m *ddlManager) tick( m.justSentDDL = nil m.checkpointTs = checkpointTs - ok, err := m.checkAndSendBootstrapMsgs(ctx) + currentTables, err := m.allTables(ctx) if err != nil { return nil, nil, errors.Trace(err) } + + // before bootstrap finished, cannot send any event. + ok := m.trySendBootstrap(ctx, currentTables) if !ok { return nil, nil, nil } - currentTables, err := m.allTables(ctx) - if err != nil { - return nil, nil, errors.Trace(err) - } - if m.executingDDL == nil { m.ddlSink.emitCheckpointTs(m.checkpointTs, currentTables) } diff --git a/cdc/owner/ddl_manager_test.go b/cdc/owner/ddl_manager_test.go index e72ecd846d8..75757179c5d 100644 --- a/cdc/owner/ddl_manager_test.go +++ b/cdc/owner/ddl_manager_test.go @@ -18,24 +18,25 @@ import ( "encoding/json" "fmt" "testing" + "time" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" - config2 "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) -func createDDLManagerForTest(t *testing.T) *ddlManager { +func createDDLManagerForTest(t *testing.T, shouldSendAllBootstrapAtStart bool) *ddlManager { startTs, checkpointTs := model.Ts(0), model.Ts(1) changefeedID := model.DefaultChangeFeedID("ddl-manager-test") ddlSink := &mockDDLSink{} ddlPuller := &mockDDLPuller{} - cfg := config2.GetDefaultReplicaConfig() + cfg := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(cfg, "") require.Nil(t, err) schema, err := entry.NewSchemaStorage(nil, startTs, cfg.ForceReplicate, changefeedID, util.RoleTester, f) @@ -51,7 +52,8 @@ func createDDLManagerForTest(t *testing.T) *ddlManager { redo.NewDisabledDDLManager(), redo.NewDisabledMetaManager(), false, - false, + shouldSendAllBootstrapAtStart, + func(err error) {}, ) return res } @@ -77,7 +79,7 @@ func newFakeDDLEvent( } func TestGetNextDDL(t *testing.T) { - dm := createDDLManagerForTest(t) + dm := createDDLManagerForTest(t, false) dm.executingDDL = newFakeDDLEvent(1, "test_1", timodel.ActionDropColumn, 1) require.Equal(t, dm.executingDDL, dm.getNextDDL()) @@ -95,7 +97,7 @@ func TestGetNextDDL(t *testing.T) { } func TestBarriers(t *testing.T) { - dm := createDDLManagerForTest(t) + dm := createDDLManagerForTest(t, false) tableID1 := int64(1) tableName1 := model.TableName{Table: "test_1", TableID: tableID1} @@ -145,7 +147,7 @@ func TestBarriers(t *testing.T) { } func TestGetSnapshotTs(t *testing.T) { - dm := createDDLManagerForTest(t) + dm := createDDLManagerForTest(t, false) dm.startTs = 0 dm.checkpointTs = 1 require.Equal(t, dm.startTs, dm.getSnapshotTs()) @@ -166,7 +168,7 @@ func TestExecRenameTablesDDL(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() ctx := context.Background() - dm := createDDLManagerForTest(t) + dm := createDDLManagerForTest(t, false) mockDDLSink := dm.ddlSink.(*mockDDLSink) var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64 @@ -267,7 +269,7 @@ func TestExecDropTablesDDL(t *testing.T) { defer helper.Close() ctx := context.Background() - dm := createDDLManagerForTest(t) + dm := createDDLManagerForTest(t, false) mockDDLSink := dm.ddlSink.(*mockDDLSink) execCreateStmt := func(actualDDL, expectedDDL string) { @@ -332,7 +334,7 @@ func TestExecDropViewsDDL(t *testing.T) { defer helper.Close() ctx := context.Background() - dm := createDDLManagerForTest(t) + dm := createDDLManagerForTest(t, false) mockDDLSink := dm.ddlSink.(*mockDDLSink) execCreateStmt := func(actualDDL, expectedDDL string) { @@ -462,36 +464,44 @@ func TestIsGlobalDDL(t *testing.T) { } } -func TestCheckAndSendBootstrapMsgs(t *testing.T) { +func TestTrySendBootstrap(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() ddl1 := helper.DDL2Event("create table test.tb1(id int primary key)") ddl2 := helper.DDL2Event("create table test.tb2(id int primary key)") ctx := context.Background() - dm := createDDLManagerForTest(t) + dm := createDDLManagerForTest(t, false) dm.schema = helper.SchemaStorage() dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs - mockDDLSink := dm.ddlSink.(*mockDDLSink) - mockDDLSink.recordDDLHistory = true + mock := dm.ddlSink.(*mockDDLSink) + mock.recordDDLHistory = true // do not send all bootstrap messages - send, err := dm.checkAndSendBootstrapMsgs(ctx) - require.Nil(t, err) - require.True(t, send) - require.False(t, dm.bootstraped) - require.Equal(t, 0, len(mockDDLSink.ddlHistory)) + currentTables, err := dm.allTables(ctx) + require.Equal(t, 2, len(currentTables)) + require.NoError(t, err) + ok := dm.trySendBootstrap(ctx, currentTables) + require.True(t, ok) + require.True(t, dm.isBootstrapped()) + require.Equal(t, 0, len(mock.ddlHistory)) // send all bootstrap messages -> tb1 and tb2 - dm.shouldSendAllBootstrapAtStart = true - send, err = dm.checkAndSendBootstrapMsgs(ctx) - require.Nil(t, err) - require.True(t, send) - require.True(t, dm.bootstraped) - require.Equal(t, 2, len(mockDDLSink.ddlHistory)) - require.True(t, mockDDLSink.ddlHistory[0].IsBootstrap) - require.True(t, mockDDLSink.ddlHistory[1].IsBootstrap) - require.Equal(t, ddl1.TableInfo.TableName, mockDDLSink.ddlHistory[0].TableInfo.TableName) - require.Equal(t, ddl2.TableInfo.TableName, mockDDLSink.ddlHistory[1].TableInfo.TableName) + dm = createDDLManagerForTest(t, true) + mock = dm.ddlSink.(*mockDDLSink) + mock.recordDDLHistory = true + dm.schema = helper.SchemaStorage() + dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs + + _ = dm.trySendBootstrap(ctx, currentTables) + require.Eventually(t, func() bool { + return dm.trySendBootstrap(ctx, currentTables) + }, 5*time.Second, 100*time.Millisecond) + + require.Equal(t, 2, len(mock.ddlHistory)) + require.True(t, mock.ddlHistory[0].IsBootstrap) + require.True(t, mock.ddlHistory[1].IsBootstrap) + require.Equal(t, ddl1.TableInfo.TableName, mock.ddlHistory[0].TableInfo.TableName) + require.Equal(t, ddl2.TableInfo.TableName, mock.ddlHistory[1].TableInfo.TableName) }