diff --git a/cdc/sink/ddlsink/mysql/async_ddl.go b/cdc/sink/ddlsink/mysql/async_ddl.go index 1692a394f37..8e480bb06b0 100644 --- a/cdc/sink/ddlsink/mysql/async_ddl.go +++ b/cdc/sink/ddlsink/mysql/async_ddl.go @@ -15,25 +15,29 @@ package mysql import ( "context" - "database/sql" "fmt" "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/dumpling/export" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) +const timeout = 5 * time.Second + +// TODO: Use the flollowing SQL to check the ddl job status after tidb optimize +// the information_schema.ddl_jobs table. Ref: https://github.com/pingcap/tidb/issues/55725 +// +// SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY +// FROM information_schema.ddl_jobs var checkRunningAddIndexSQL = ` -SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY -FROM information_schema.ddl_jobs +ADMIN SHOW DDL JOBS 1 WHERE DB_NAME = "%s" AND TABLE_NAME = "%s" AND JOB_TYPE LIKE "add index%%" - AND (STATE = "running" OR STATE = "queueing") -LIMIT 1; + AND (STATE = "running" OR STATE = "queueing"); ` func (m *DDLSink) shouldAsyncExecDDL(ddl *model.DDLEvent) bool { @@ -92,9 +96,23 @@ func (m *DDLSink) asyncExecDDL(ctx context.Context, ddl *model.DDLEvent) error { } } +func (m *DDLSink) needWaitAsyncExecDone(t timodel.ActionType) bool { + if !m.cfg.IsTiDB { + return false + } + switch t { + case timodel.ActionCreateTable, timodel.ActionCreateTables: + return false + case timodel.ActionCreateSchema: + return false + default: + return true + } +} + // Should always wait for async ddl done before executing the next ddl. func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) { - if !m.cfg.IsTiDB { + if !m.needWaitAsyncExecDone(ddl.Type) { return } @@ -105,9 +123,6 @@ func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) { if ddl.PreTableInfo != nil { tables[ddl.PreTableInfo.TableName] = struct{}{} } - if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) { - return - } log.Debug("wait async exec ddl done", zap.String("namespace", m.id.Namespace), @@ -115,6 +130,10 @@ func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) { zap.Any("tables", tables), zap.Uint64("commitTs", ddl.CommitTs), zap.String("ddl", ddl.Query)) + if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) { + return + } + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { @@ -131,6 +150,8 @@ func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) { } func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() for table := range tables { done := m.doCheck(ctx, table) if !done { @@ -141,6 +162,7 @@ func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.Ta } func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool) { + start := time.Now() if v, ok := m.lastExecutedNormalDDLCache.Get(table); ok { ddlType := v.(timodel.ActionType) if ddlType == timodel.ActionAddIndex { @@ -152,35 +174,41 @@ func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool return true } - ret := m.db.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table)) - if ret.Err() != nil { + rows, err := m.db.QueryContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table)) + defer func() { + if rows != nil { + _ = rows.Err() + } + }() + if err != nil { log.Error("check async exec ddl failed", zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID), - zap.Error(ret.Err())) + zap.Error(err)) return true } - var jobID, jobType, schemaState, schemaID, tableID, state, query string - if err := ret.Scan(&jobID, &jobType, &schemaState, &schemaID, &tableID, &state, &query); err != nil { - if !errors.Is(err, sql.ErrNoRows) { - log.Error("check async exec ddl failed", - zap.String("namespace", m.id.Namespace), - zap.String("changefeed", m.id.ID), - zap.Error(err)) - } + rets, err := export.GetSpecifiedColumnValuesAndClose(rows, "JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "STATE") + if err != nil { + log.Error("check async exec ddl failed", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(err)) return true } + if len(rets) == 0 { + return true + } + ret := rets[0] + jobID, jobType, schemaState, state := ret[0], ret[1], ret[2], ret[3] log.Info("async ddl is still running", zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID), + zap.Duration("checkDuration", time.Since(start)), zap.String("table", table.String()), zap.String("jobID", jobID), zap.String("jobType", jobType), zap.String("schemaState", schemaState), - zap.String("schemaID", schemaID), - zap.String("tableID", tableID), - zap.String("state", state), - zap.String("query", query)) + zap.String("state", state)) return false } diff --git a/cdc/sink/ddlsink/mysql/async_ddl_test.go b/cdc/sink/ddlsink/mysql/async_ddl_test.go index 5f5802b30b6..9430de508b7 100644 --- a/cdc/sink/ddlsink/mysql/async_ddl_test.go +++ b/cdc/sink/ddlsink/mysql/async_ddl_test.go @@ -51,8 +51,14 @@ func TestWaitAsynExecDone(t *testing.T) { // Case 1: there is a running add index job mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows( - sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "STATE", "QUERY"}). - AddRow("1", "add index", "running", "1", "1", "running", "Create index idx1 on test.sbtest0(a)"), + sqlmock.NewRows([]string{ + "JOB_ID", "DB_NAME", "TABLE_NAME", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", + "ROW_COUNT", "CREATE_TIME", "START_TIME", "END_TIME", "STATE", + }).AddRow( + 1, "test", "sbtest0", "add index", "write reorganization", 1, 1, 0, time.Now(), nil, time.Now(), "running", + ).AddRow( + 2, "test", "sbtest0", "add index", "write reorganization", 1, 1, 0, time.Now(), time.Now(), time.Now(), "queueing", + ), ) // Case 2: there is no running add index job // Case 3: no permission to query ddl_jobs, TiDB will return empty result @@ -166,3 +172,21 @@ func TestAsyncExecAddIndex(t *testing.T) { require.True(t, time.Since(start) >= 10*time.Second) sink.Close() } + +func TestNeedWaitAsyncExecDone(t *testing.T) { + sink := &DDLSink{ + cfg: &pmysql.Config{ + IsTiDB: false, + }, + } + require.False(t, sink.needWaitAsyncExecDone(timodel.ActionTruncateTable)) + + sink.cfg.IsTiDB = true + require.True(t, sink.needWaitAsyncExecDone(timodel.ActionTruncateTable)) + require.True(t, sink.needWaitAsyncExecDone(timodel.ActionDropTable)) + require.True(t, sink.needWaitAsyncExecDone(timodel.ActionDropIndex)) + + require.False(t, sink.needWaitAsyncExecDone(timodel.ActionCreateTable)) + require.False(t, sink.needWaitAsyncExecDone(timodel.ActionCreateTables)) + require.False(t, sink.needWaitAsyncExecDone(timodel.ActionCreateSchema)) +}