Skip to content

Commit

Permalink
changefeed(ticdc): send bootstrap message asynchronously to prevent b…
Browse files Browse the repository at this point in the history
…lock other changefeeds (#11573) (#11589)

close #11565
  • Loading branch information
ti-chi-bot authored Sep 12, 2024
1 parent ec9b34c commit e3d1e2a
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 109 deletions.
5 changes: 5 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ func (c *changefeed) tick(ctx context.Context,
if err != nil {
return 0, 0, errors.Trace(err)
}
// bootstrap not finished yet, cannot send any event.
if !c.ddlManager.isBootstrapped() {
return 0, 0, nil
}

err = c.handleBarrier(ctx, cfInfo, cfStatus, barrier)
if err != nil {
Expand Down Expand Up @@ -710,6 +714,7 @@ LOOP2:
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(),
c.Throw(ctx),
)

// create scheduler
Expand Down
100 changes: 70 additions & 30 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type mockDDLSink struct {
syncPoint model.Ts
syncPointHis []model.Ts

bootstrapError bool

wg sync.WaitGroup
}

Expand Down Expand Up @@ -146,16 +148,15 @@ func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo
return m.mu.checkpointTs, m.mu.currentTables
}

func (m *mockDDLSink) close(ctx context.Context) error {
func (m *mockDDLSink) close(_ context.Context) error {
m.wg.Wait()
return nil
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
func (m *mockDDLSink) emitBootstrap(_ context.Context, bootstrap *model.DDLEvent) error {
if m.bootstrapError {
return errors.New("emit bootstrap error")
}
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
Expand Down Expand Up @@ -196,8 +197,28 @@ func (m *mockScheduler) DrainCapture(target model.CaptureID) (int, error) {
// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx context.Context) {}

func newMockDDLSink(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
}

func newMockDDLSinkWithBootstrapError(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
bootstrapError: true,
}
}

func newMockPuller(_ context.Context, _ *upstream.Upstream, startTs uint64, _ model.ChangeFeedID, schemaStorage entry.SchemaStorage, _ filter.Filter) puller.DDLPuller {
return &mockDDLPuller{resolvedTs: startTs, schemaStorage: schemaStorage}
}

func createChangefeed4Test(globalVars *vars.GlobalVars,
changefeedInfo *model.ChangeFeedInfo,
newMockDDLSink func(model.ChangeFeedID, *model.ChangeFeedInfo, func(error), func(error)) DDLSink,
t *testing.T,
) (
*changefeed, map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester, *orchestrator.ChangefeedReactorState,
Expand All @@ -220,22 +241,9 @@ func createChangefeed4Test(globalVars *vars.GlobalVars,
cf := newChangefeed4Test(model.DefaultChangeFeedID(changefeedInfo.ID),
state.Info, state.Status, NewFeedStateManager(up, state), up,
// new ddl puller
func(ctx context.Context,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) puller.DDLPuller {
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}
},
newMockPuller,
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
},
newMockDDLSink,
// new scheduler
func(
ctx context.Context, id model.ChangeFeedID, up *upstream.Upstream, epoch uint64,
Expand Down Expand Up @@ -267,7 +275,7 @@ func createChangefeed4Test(globalVars *vars.GlobalVars,

func TestPreCheck(t *testing.T) {
globalvars, changefeedVars := vars.NewGlobalVarsAndChangefeedInfo4Test()
_, captures, tester, state := createChangefeed4Test(globalvars, changefeedVars, t)
_, captures, tester, state := createChangefeed4Test(globalvars, changefeedVars, newMockDDLSink, t)
state.CheckCaptureAlive(globalvars.CaptureInfo.ID)
preflightCheck(state, captures)
tester.MustApplyPatches()
Expand All @@ -290,7 +298,7 @@ func TestPreCheck(t *testing.T) {
func TestInitialize(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
state.CheckCaptureAlive(globalvars.CaptureInfo.ID)
Expand All @@ -307,7 +315,7 @@ func TestInitialize(t *testing.T) {
func TestChangefeedHandleError(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
state.CheckCaptureAlive(globalvars.CaptureInfo.ID)
Expand All @@ -326,6 +334,38 @@ func TestChangefeedHandleError(t *testing.T) {
require.Equal(t, state.Info.Error.Message, "fake error")
}

func TestTrySendBootstrapMeetError(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
_ = helper.DDL2Event("create table test.t(id int primary key, b int)")

ctx := context.Background()
globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSinkWithBootstrapError, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)

// pre check
state.CheckCaptureAlive(globalVars.CaptureInfo.ID)
require.False(t, preflightCheck(state, captures))
tester.MustApplyPatches()

// initialize
state.Info.Config.Sink.Protocol = util.AddressOf("simple")
state.Info.Config.Sink.SendAllBootstrapAtStart = util.AddressOf(true)
cf.Tick(ctx, state.Info, state.Status, captures)
tester.MustApplyPatches()

require.Eventually(t, func() bool {
cf.Tick(ctx, state.Info, state.Status, captures)
tester.MustApplyPatches()
if state.Info.Error != nil {
return state.Info.State == model.StatePending
}
return false
}, 5*time.Second, 100*time.Millisecond)
}

func TestExecDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand All @@ -338,7 +378,7 @@ func TestExecDDL(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
changefeedInfo.StartTs = startTs
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)
tickTwoTime := func() {
Expand Down Expand Up @@ -423,7 +463,7 @@ func TestEmitCheckpointTs(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
changefeedInfo.StartTs = startTs
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()

defer cf.Close(ctx)
Expand Down Expand Up @@ -490,7 +530,7 @@ func TestSyncPoint(t *testing.T) {
changefeedInfo.Config.SyncPointInterval = util.AddressOf(1 * time.Second)
// SyncPoint option is only available for MySQL compatible database.
changefeedInfo.SinkURI = "mysql://"
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -523,7 +563,7 @@ func TestFinished(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
ctx := context.Background()
changefeedInfo.TargetTs = changefeedInfo.StartTs + 1000
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -595,7 +635,7 @@ func testChangefeedReleaseResource(
expectedInitialized bool,
) {
var err error
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)

// pre check
state.CheckCaptureAlive(globalVars.CaptureInfo.ID)
Expand Down Expand Up @@ -646,7 +686,7 @@ func TestBarrierAdvance(t *testing.T) {
}
changefeedInfo.SinkURI = "mysql://"

cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)

// The changefeed load the info from etcd.
Expand Down
Loading

0 comments on commit e3d1e2a

Please sign in to comment.