diff --git a/go/base/context.go b/go/base/context.go index 2518ecf4e..236ea145d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -102,6 +102,7 @@ type MigrationContext struct { GoogleCloudPlatform bool AzureMySQL bool AttemptInstantDDL bool + OceanBase bool // SkipPortValidation allows skipping the port validation in `ValidateConnection` // This is useful when connecting to a MySQL instance where the external port diff --git a/go/base/utils.go b/go/base/utils.go index 89f6d315f..d20ca0158 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -6,6 +6,7 @@ package base import ( + "errors" "fmt" "os" "regexp" @@ -62,6 +63,10 @@ func StringContainsAll(s string, substrings ...string) bool { } func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) { + if err := validateOceanBaseConnection(db, migrationContext); err != nil { + return "", err + } + versionQuery := `select @@global.version` var version string @@ -84,7 +89,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, // GCP set users port to "NULL", replace it by gh-ost param // Azure MySQL set users port to a different value by design, replace it by gh-ost para var port int - if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL { + if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBase { port = connectionConfig.Key.Port } else { portQuery := `select @@global.port` @@ -102,3 +107,27 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort) } } + +func validateOceanBaseConnection(db *gosql.DB, migrationContext *MigrationContext) error { + versionCommentQuery := `select @@global.version_comment` + var versionComment string + if err := db.QueryRow(versionCommentQuery).Scan(&versionComment); err != nil { + return nil + } + if !strings.Contains(versionComment, "OceanBase") { + return nil + } + + migrationContext.Log.Infof("OceanBase connection identified, version_comment: %v", versionComment) + migrationContext.OceanBase = true + + enableLockPriorityQuery := `select value from oceanbase.GV$OB_PARAMETERS where name='enable_lock_priority'` + var enableLockPriority bool + if err := db.QueryRow(enableLockPriorityQuery).Scan(&enableLockPriority); err != nil { + return err + } + if !enableLockPriority { + return errors.New("system parameter 'enable_lock_priority' should be true to support cut-over") + } + return nil +} diff --git a/go/logic/applier.go b/go/logic/applier.go index 1be696909..316db4413 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -100,7 +100,7 @@ func (this *Applier) InitDBConnections() (err error) { if err := this.validateAndReadGlobalVariables(); err != nil { return err } - if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL { + if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase { if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil { return err } else { @@ -714,24 +714,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } -// LockOriginalTable places a write lock on the original table -func (this *Applier) LockOriginalTable() error { - query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, - sql.EscapeName(this.migrationContext.DatabaseName), - sql.EscapeName(this.migrationContext.OriginalTableName), - ) - this.migrationContext.Log.Infof("Locking %s.%s", - sql.EscapeName(this.migrationContext.DatabaseName), - sql.EscapeName(this.migrationContext.OriginalTableName), - ) +// lockTable places a write lock on the specific table +func (this *Applier) lockTable(databaseName, tableName string) error { + query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName) + this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName) this.migrationContext.LockTablesStartTime = time.Now() if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { return err } - this.migrationContext.Log.Infof("Table locked") + this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName) return nil } +// LockOriginalTable places a write lock on the original table +func (this *Applier) LockOriginalTable() error { + return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName) +} + +// LockGhostTable places a write lock on the ghost table +func (this *Applier) LockGhostTable() error { + return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName()) +} + // UnlockTables makes tea. No wait, it unlocks tables. func (this *Applier) UnlockTables() error { query := `unlock /* gh-ost */ tables` @@ -1033,7 +1037,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2 this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) - query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) + query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds) if _, err := tx.Exec(query); err != nil { tableLocked <- err return err @@ -1108,25 +1112,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke return nil } -// AtomicCutoverRename -func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error { - tx, err := this.db.Begin() +func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error { + tx, err := db.Begin() if err != nil { return err } defer func() { tx.Rollback() - sessionIdChan <- -1 - tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads") + if sessionIdChan != nil { + sessionIdChan <- -1 + } + if tablesRenamed != nil { + tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads") + } }() - var sessionId int64 - if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil { - return err + + if sessionIdChan != nil { + var sessionId int64 + if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil { + return err + } + sessionIdChan <- sessionId } - sessionIdChan <- sessionId this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds) - query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds) + query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds) if _, err := tx.Exec(query); err != nil { return err } @@ -1143,14 +1153,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed ) this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query) if _, err := tx.Exec(query); err != nil { - tablesRenamed <- err + if tablesRenamed != nil { + tablesRenamed <- err + } return this.migrationContext.Log.Errore(err) } - tablesRenamed <- nil + if tablesRenamed != nil { + tablesRenamed <- nil + } this.migrationContext.Log.Infof("Tables renamed") return nil } +// AtomicCutoverRename renames tables for atomic cut over in non lock session +func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error { + return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed) +} + +// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session +func (this *Applier) AtomicCutoverRenameWithLock() error { + return this.atomicCutoverRename(this.singletonDB, nil, nil) +} + func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) { query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName) if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil { diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 9d414a43e..d9325a316 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) { if err := this.validateConnection(); err != nil { return err } - if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL { + if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase { if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil { return err } else { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b6bf3fc5e..0966bcfa3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool { // onChangelogEvent is called when a binlog event operation on the changelog table is intercepted. func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { + if dmlEvent.NewColumnValues == nil { + // in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is + // converted to a DELETE event and an INSERT event, we need to skip the DELETE event. + return nil + } // Hey, I created the changelog table, I know the type of columns it has! switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { case "state": @@ -562,9 +567,15 @@ func (this *Migrator) cutOver() (err error) { switch this.migrationContext.CutOverType { case base.CutOverAtomic: - // Atomic solution: we use low timeout and multiple attempts. But for - // each failed attempt, we throttle until replication lag is back to normal - err = this.atomicCutOver() + if this.migrationContext.OceanBase || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") { + // Atomic solution for latest MySQL: cut over the tables in the same session where the origin + // table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions + err = this.atomicCutOverMySQL8() + } else { + // Atomic solution: we use low timeout and multiple attempts. But for + // each failed attempt, we throttle until replication lag is back to normal + err = this.atomicCutOver() + } case base.CutOverTwoStep: err = this.cutOverTwoStep() default: @@ -643,6 +654,39 @@ func (this *Migrator) cutOverTwoStep() (err error) { return nil } +// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute +// what's left of last DML entries, and atomically swap original->old, then new->original. +// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is +// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html. +func (this *Migrator) atomicCutOverMySQL8() (err error) { + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) + + if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { + return err + } + + if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { + return err + } + if err := this.retryOperation(this.applier.LockGhostTable); err != nil { + return err + } + + if err := this.applier.AtomicCutoverRenameWithLock(); err != nil { + return err + } + if err := this.retryOperation(this.applier.UnlockTables); err != nil { + return err + } + + lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil +} + // atomicCutOver func (this *Migrator) atomicCutOver() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) diff --git a/go/mysql/utils.go b/go/mysql/utils.go index c69a3f255..8315eb0f5 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -8,6 +8,7 @@ package mysql import ( gosql "database/sql" "fmt" + "strconv" "strings" "sync" "time" @@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error { _, err := db.Exec(`KILL QUERY %s`, connectionID) return err } + +func versionTokens(version string, digits int) []int { + v := strings.Split(version, "-")[0] + tokens := strings.Split(v, ".") + intTokens := make([]int, digits) + for i := range tokens { + if i >= digits { + break + } + intTokens[i], _ = strconv.Atoi(tokens[i]) + } + return intTokens +} + +func isSmallerVersion(version string, otherVersion string, digits int) bool { + v := versionTokens(version, digits) + o := versionTokens(otherVersion, digits) + for i := 0; i < len(v); i++ { + if v[i] < o[i] { + return true + } + if v[i] > o[i] { + return false + } + if i == digits { + break + } + } + return false +} + +// IsSmallerMajorVersion tests two versions against another and returns true if +// the former is a smaller "major" version than the latter. +// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9 +func IsSmallerMajorVersion(version string, otherVersion string) bool { + return isSmallerVersion(version, otherVersion, 2) +} + +// IsSmallerMinorVersion tests two versions against another and returns true if +// the former is a smaller "minor" version than the latter. +// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7 +func IsSmallerMinorVersion(version string, otherVersion string) bool { + return isSmallerVersion(version, otherVersion, 3) +}