Skip to content

Commit

Permalink
sink, ddl(ticdc): support add index ddl in downstream (#11476) (#11480)
Browse files Browse the repository at this point in the history
close #10682
  • Loading branch information
ti-chi-bot authored Aug 13, 2024
1 parent c7a5513 commit 2f9aad8
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 131 deletions.
15 changes: 7 additions & 8 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ func (m *ddlManager) tick(
zap.Uint64("commitTs", nextDDL.CommitTs),
zap.Uint64("checkpointTs", m.checkpointTs))
m.executingDDL = nextDDL
skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL)
if err != nil {
return nil, nil, errors.Trace(err)
}
if skip {
m.cleanCache(cleanMsg)
}
}
err := m.executeDDL(ctx)
if err != nil {
Expand Down Expand Up @@ -431,14 +438,6 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
if m.executingDDL == nil {
return nil
}
skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL)
if err != nil {
return errors.Trace(err)
}
if skip {
m.cleanCache(cleanMsg)
return nil
}

failpoint.Inject("ExecuteNotDone", func() {
// This ddl will never finish executing.
Expand Down
186 changes: 186 additions & 0 deletions cdc/sink/ddlsink/mysql/async_ddl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

var checkRunningAddIndexSQL = `
SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY
FROM information_schema.ddl_jobs
WHERE DB_NAME = "%s"
AND TABLE_NAME = "%s"
AND JOB_TYPE LIKE "add index%%"
AND (STATE = "running" OR STATE = "queueing")
LIMIT 1;
`

func (m *DDLSink) shouldAsyncExecDDL(ddl *model.DDLEvent) bool {
return m.cfg.IsTiDB && ddl.Type == timodel.ActionAddIndex
}

// asyncExecDDL executes ddl in async mode.
// this function only works in TiDB, because TiDB will save ddl jobs
// and execute them asynchronously even if ticdc crashed.
func (m *DDLSink) asyncExecDDL(ctx context.Context, ddl *model.DDLEvent) error {
done := make(chan error, 1)
// Use a longer timeout to ensure the add index ddl is sent to tidb before executing the next ddl.
tick := time.NewTimer(10 * time.Second)
defer tick.Stop()
log.Info("async exec add index ddl start",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
go func() {
if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil {
log.Error("async exec add index ddl failed",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- err
return
}
log.Info("async exec add index ddl done",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- nil
}()

select {
case <-ctx.Done():
// if the ddl is canceled, we just return nil, if the ddl is not received by tidb,
// the downstream ddl is lost, because the checkpoint ts is forwarded.
log.Info("async add index ddl exits as canceled",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
case err := <-done:
// if the ddl is executed within 2 seconds, we just return the result to the caller.
return err
case <-tick.C:
// if the ddl is still running, we just return nil,
// then if the ddl is failed, the downstream ddl is lost.
// because the checkpoint ts is forwarded.
log.Info("async add index ddl is still running",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
}
}

// Should always wait for async ddl done before executing the next ddl.
func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
if !m.cfg.IsTiDB {
return
}

tables := make(map[model.TableName]struct{})
if ddl.TableInfo != nil {
tables[ddl.TableInfo.TableName] = struct{}{}
}
if ddl.PreTableInfo != nil {
tables[ddl.PreTableInfo.TableName] = struct{}{}
}
if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) {
return
}

log.Debug("wait async exec ddl done",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Any("tables", tables),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
done := m.checkAsyncExecDDLDone(ctx, tables)
if done {
return
}
}
}
}

func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool {
for table := range tables {
done := m.doCheck(ctx, table)
if !done {
return false
}
}
return true
}

func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool) {
if v, ok := m.lastExecutedNormalDDLCache.Get(table); ok {
ddlType := v.(timodel.ActionType)
if ddlType == timodel.ActionAddIndex {
log.Panic("invalid ddl type in lastExecutedNormalDDLCache",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("ddlType", ddlType.String()))
}
return true
}

ret := m.db.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table))
if ret.Err() != nil {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(ret.Err()))
return true
}
var jobID, jobType, schemaState, schemaID, tableID, state, query string
if err := ret.Scan(&jobID, &jobType, &schemaState, &schemaID, &tableID, &state, &query); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
}
return true
}

log.Info("async ddl is still running",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("table", table.String()),
zap.String("jobID", jobID),
zap.String("jobType", jobType),
zap.String("schemaState", schemaState),
zap.String("schemaID", schemaID),
zap.String("tableID", tableID),
zap.String("state", state),
zap.String("query", query))
return false
}
168 changes: 168 additions & 0 deletions cdc/sink/ddlsink/mysql/async_ddl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"errors"
"fmt"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/stretchr/testify/require"
)

func TestWaitAsynExecDone(t *testing.T) {
var dbIndex int32 = 0
GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) {
defer func() {
atomic.AddInt32(&dbIndex, 1)
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))

// Case 1: there is a running add index job
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows(
sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "STATE", "QUERY"}).
AddRow("1", "add index", "running", "1", "1", "running", "Create index idx1 on test.sbtest0(a)"),
)
// Case 2: there is no running add index job
// Case 3: no permission to query ddl_jobs, TiDB will return empty result
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows(
sqlmock.NewRows(nil),
)
// Case 4: query ddl_jobs failed
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnError(
errors.New("mock error"),
)

mock.ExpectClose()
return db, nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sinkURI, err := url.Parse("mysql://root:@127.0.0.1:4000")
require.NoError(t, err)
replicateCfg := config.GetDefaultReplicaConfig()
ddlSink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateCfg)
require.NoError(t, err)

table := model.TableName{Schema: "test", Table: "sbtest0"}
tables := make(map[model.TableName]struct{})
tables[table] = struct{}{}

// Test fast path, ddlSink.lastExecutedNormalDDLCache meet panic
ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionAddIndex)
require.Panics(t, func() {
ddlSink.checkAsyncExecDDLDone(ctx, tables)
})

// Test fast path, ddlSink.lastExecutedNormalDDLCache is hit
ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionCreateTable)
done := ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.True(t, done)

// Clenup the cache, always check the async running state
ddlSink.lastExecutedNormalDDLCache.Remove(table)

// Test has running async ddl job
done = ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.False(t, done)

// Test no running async ddl job
done = ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.True(t, done)

// Test ignore error
done = ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.True(t, done)

ddlSink.Close()
}

func TestAsyncExecAddIndex(t *testing.T) {
ddlExecutionTime := time.Second * 15
var dbIndex int32 = 0
GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) {
defer func() {
atomic.AddInt32(&dbIndex, 1)
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("Create index idx1 on test.t1(a)").
WillDelayFor(ddlExecutionTime).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectClose()
return db, nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sinkURI, err := url.Parse("mysql://127.0.0.1:4000")
require.Nil(t, err)
rc := config.GetDefaultReplicaConfig()
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, rc)

require.Nil(t, err)

ddl1 := &model.DDLEvent{
StartTs: 1000,
CommitTs: 1010,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "test",
Table: "t1",
},
},
Type: timodel.ActionAddIndex,
Query: "Create index idx1 on test.t1(a)",
}
start := time.Now()
err = sink.WriteDDLEvent(ctx, ddl1)
require.Nil(t, err)
require.True(t, time.Since(start) < ddlExecutionTime)
require.True(t, time.Since(start) >= 10*time.Second)
sink.Close()
}
Loading

0 comments on commit 2f9aad8

Please sign in to comment.