Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed(ticdc): send bootstrap message asynchronously to prevent block other changefeeds #11573

Merged
merged 7 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ func (c *changefeed) tick(ctx context.Context,
if err != nil {
return 0, 0, errors.Trace(err)
}
// bootstrap not finished yet, cannot send any event.
if !c.ddlManager.isBootstrapped() {
return 0, 0, nil
}

err = c.handleBarrier(ctx, cfInfo, cfStatus, barrier)
if err != nil {
Expand Down Expand Up @@ -718,6 +722,7 @@ LOOP2:
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(),
c.Throw(ctx),
)

// create scheduler
Expand Down
100 changes: 70 additions & 30 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type mockDDLSink struct {
syncPoint model.Ts
syncPointHis []model.Ts

bootstrapError bool

wg sync.WaitGroup
}

Expand Down Expand Up @@ -146,16 +148,15 @@ func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo
return m.mu.checkpointTs, m.mu.currentTables
}

func (m *mockDDLSink) close(ctx context.Context) error {
func (m *mockDDLSink) close(_ context.Context) error {
m.wg.Wait()
return nil
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
func (m *mockDDLSink) emitBootstrap(_ context.Context, bootstrap *model.DDLEvent) error {
if m.bootstrapError {
return errors.New("emit bootstrap error")
}
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
Expand Down Expand Up @@ -196,8 +197,28 @@ func (m *mockScheduler) DrainCapture(target model.CaptureID) (int, error) {
// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx context.Context) {}

func newMockDDLSink(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
}

func newMockDDLSinkWithBootstrapError(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
bootstrapError: true,
}
}

func newMockPuller(_ context.Context, _ *upstream.Upstream, startTs uint64, _ model.ChangeFeedID, schemaStorage entry.SchemaStorage, _ filter.Filter) puller.DDLPuller {
return &mockDDLPuller{resolvedTs: startTs, schemaStorage: schemaStorage}
}

func createChangefeed4Test(globalVars *vars.GlobalVars,
changefeedInfo *model.ChangeFeedInfo,
newMockDDLSink func(model.ChangeFeedID, *model.ChangeFeedInfo, func(error), func(error)) DDLSink,
t *testing.T,
) (
*changefeed, map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester, *orchestrator.ChangefeedReactorState,
Expand All @@ -220,22 +241,9 @@ func createChangefeed4Test(globalVars *vars.GlobalVars,
cf := newChangefeed4Test(model.DefaultChangeFeedID(changefeedInfo.ID),
state.Info, state.Status, NewFeedStateManager(up, state), up,
// new ddl puller
func(ctx context.Context,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) puller.DDLPuller {
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}
},
newMockPuller,
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
},
newMockDDLSink,
// new scheduler
func(
ctx context.Context, id model.ChangeFeedID, up *upstream.Upstream, epoch uint64,
Expand Down Expand Up @@ -267,7 +275,7 @@ func createChangefeed4Test(globalVars *vars.GlobalVars,

func TestPreCheck(t *testing.T) {
globalvars, changefeedVars := vars.NewGlobalVarsAndChangefeedInfo4Test()
_, captures, tester, state := createChangefeed4Test(globalvars, changefeedVars, t)
_, captures, tester, state := createChangefeed4Test(globalvars, changefeedVars, newMockDDLSink, t)
state.CheckCaptureAlive(globalvars.CaptureInfo.ID)
preflightCheck(state, captures)
tester.MustApplyPatches()
Expand All @@ -290,7 +298,7 @@ func TestPreCheck(t *testing.T) {
func TestInitialize(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
state.CheckCaptureAlive(globalvars.CaptureInfo.ID)
Expand All @@ -307,7 +315,7 @@ func TestInitialize(t *testing.T) {
func TestChangefeedHandleError(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
state.CheckCaptureAlive(globalvars.CaptureInfo.ID)
Expand All @@ -326,6 +334,38 @@ func TestChangefeedHandleError(t *testing.T) {
require.Equal(t, state.Info.Error.Message, "fake error")
}

func TestTrySendBootstrapMeetError(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
_ = helper.DDL2Event("create table test.t(id int primary key, b int)")

ctx := context.Background()
globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSinkWithBootstrapError, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)

// pre check
state.CheckCaptureAlive(globalVars.CaptureInfo.ID)
require.False(t, preflightCheck(state, captures))
tester.MustApplyPatches()

// initialize
state.Info.Config.Sink.Protocol = util.AddressOf("simple")
state.Info.Config.Sink.SendAllBootstrapAtStart = util.AddressOf(true)
cf.Tick(ctx, state.Info, state.Status, captures)
tester.MustApplyPatches()

require.Eventually(t, func() bool {
cf.Tick(ctx, state.Info, state.Status, captures)
tester.MustApplyPatches()
if state.Info.Error != nil {
return state.Info.State == model.StatePending
}
return false
}, 5*time.Second, 100*time.Millisecond)
}

func TestExecDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand All @@ -338,7 +378,7 @@ func TestExecDDL(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
changefeedInfo.StartTs = startTs
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)
tickTwoTime := func() {
Expand Down Expand Up @@ -423,7 +463,7 @@ func TestEmitCheckpointTs(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
changefeedInfo.StartTs = startTs
ctx := context.Background()
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()

defer cf.Close(ctx)
Expand Down Expand Up @@ -490,7 +530,7 @@ func TestSyncPoint(t *testing.T) {
changefeedInfo.Config.SyncPointInterval = util.AddressOf(1 * time.Second)
// SyncPoint option is only available for MySQL compatible database.
changefeedInfo.SinkURI = "mysql://"
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -523,7 +563,7 @@ func TestFinished(t *testing.T) {
globalvars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
ctx := context.Background()
changefeedInfo.TargetTs = changefeedInfo.StartTs + 1000
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalvars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -595,7 +635,7 @@ func testChangefeedReleaseResource(
expectedInitialized bool,
) {
var err error
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)

// pre check
state.CheckCaptureAlive(globalVars.CaptureInfo.ID)
Expand Down Expand Up @@ -646,7 +686,7 @@ func TestBarrierAdvance(t *testing.T) {
}
changefeedInfo.SinkURI = "mysql://"

cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, t)
cf, captures, tester, state := createChangefeed4Test(globalVars, changefeedInfo, newMockDDLSink, t)
defer cf.Close(ctx)

// The changefeed load the info from etcd.
Expand Down
Loading
Loading