Skip to content

Commit

Permalink
refactor cutover for MySQL 8.x rename feature && support OceanBase
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Nov 22, 2024
1 parent 0676116 commit 2bb163c
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 30 deletions.
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type MigrationContext struct {
GoogleCloudPlatform bool
AzureMySQL bool
AttemptInstantDDL bool
OceanBase bool

// SkipPortValidation allows skipping the port validation in `ValidateConnection`
// This is useful when connecting to a MySQL instance where the external port
Expand Down
31 changes: 30 additions & 1 deletion go/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package base

import (
"errors"
"fmt"
"os"
"regexp"
Expand Down Expand Up @@ -62,6 +63,10 @@ func StringContainsAll(s string, substrings ...string) bool {
}

func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
if err := validateOceanBaseConnection(db, migrationContext); err != nil {
return "", err
}

versionQuery := `select @@global.version`

var version string
Expand All @@ -84,7 +89,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
// GCP set users port to "NULL", replace it by gh-ost param
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
var port int
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBase {
port = connectionConfig.Key.Port
} else {
portQuery := `select @@global.port`
Expand All @@ -102,3 +107,27 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort)
}
}

func validateOceanBaseConnection(db *gosql.DB, migrationContext *MigrationContext) error {
versionCommentQuery := `select @@global.version_comment`
var versionComment string
if err := db.QueryRow(versionCommentQuery).Scan(&versionComment); err != nil {
return nil

Check failure on line 115 in go/base/utils.go

View workflow job for this annotation

GitHub Actions / lint

error is not nil (line 114) but it returns nil (nilerr)
}
if !strings.Contains(versionComment, "OceanBase") {
return nil
}

migrationContext.Log.Infof("OceanBase connection identified, version_comment: %v", versionComment)
migrationContext.OceanBase = true

enableLockPriorityQuery := `select value from oceanbase.GV$OB_PARAMETERS where name='enable_lock_priority'`
var enableLockPriority bool
if err := db.QueryRow(enableLockPriorityQuery).Scan(&enableLockPriority); err != nil {
return err
}
if !enableLockPriority {
return errors.New("system parameter 'enable_lock_priority' should be true to support cut-over")
}
return nil
}
74 changes: 49 additions & 25 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (this *Applier) InitDBConnections() (err error) {
if err := this.validateAndReadGlobalVariables(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
Expand Down Expand Up @@ -714,24 +714,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return chunkSize, rowsAffected, duration, nil
}

// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
// lockTable places a write lock on the specific table
func (this *Applier) lockTable(databaseName, tableName string) error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName)
this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
this.migrationContext.Log.Infof("Table locked")
this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName)
return nil
}

// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
}

// LockGhostTable places a write lock on the ghost table
func (this *Applier) LockGhostTable() error {
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName())
}

// UnlockTables makes tea. No wait, it unlocks tables.
func (this *Applier) UnlockTables() error {
query := `unlock /* gh-ost */ tables`
Expand Down Expand Up @@ -1033,7 +1037,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke

tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
Expand Down Expand Up @@ -1108,25 +1112,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
return nil
}

// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
if sessionIdChan != nil {
sessionIdChan <- -1
}
if tablesRenamed != nil {
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}
}()
var sessionId int64
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
return err

if sessionIdChan != nil {
var sessionId int64
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
}
sessionIdChan <- sessionId

this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
Expand All @@ -1143,14 +1153,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
)
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
if tablesRenamed != nil {
tablesRenamed <- err
}
return this.migrationContext.Log.Errore(err)
}
tablesRenamed <- nil
if tablesRenamed != nil {
tablesRenamed <- nil
}
this.migrationContext.Log.Infof("Tables renamed")
return nil
}

// AtomicCutoverRename renames tables for atomic cut over in non lock session
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
}

// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
func (this *Applier) AtomicCutoverRenameWithLock() error {
return this.atomicCutoverRename(this.singletonDB, nil, nil)
}

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.validateConnection(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
Expand Down
50 changes: 47 additions & 3 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool {

// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
if dmlEvent.NewColumnValues == nil {
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
return nil
}
// Hey, I created the changelog table, I know the type of columns it has!
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
case "state":
Expand Down Expand Up @@ -562,9 +567,15 @@ func (this *Migrator) cutOver() (err error) {

switch this.migrationContext.CutOverType {
case base.CutOverAtomic:
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err = this.atomicCutOver()
if this.migrationContext.OceanBase || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
err = this.atomicCutOverMySQL8()
} else {
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err = this.atomicCutOver()
}
case base.CutOverTwoStep:
err = this.cutOverTwoStep()
default:
Expand Down Expand Up @@ -643,6 +654,39 @@ func (this *Migrator) cutOverTwoStep() (err error) {
return nil
}

// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
// what's left of last DML entries, and atomically swap original->old, then new->original.
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
func (this *Migrator) atomicCutOverMySQL8() (err error) {
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)

if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
return err
}

if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
return err
}
if err := this.retryOperation(this.applier.LockGhostTable); err != nil {
return err
}

if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
return err
}
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err
}

lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}

// atomicCutOver
func (this *Migrator) atomicCutOver() (err error) {
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
Expand Down
45 changes: 45 additions & 0 deletions go/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package mysql
import (
gosql "database/sql"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error {
_, err := db.Exec(`KILL QUERY %s`, connectionID)
return err
}

func versionTokens(version string, digits int) []int {
v := strings.Split(version, "-")[0]
tokens := strings.Split(v, ".")
intTokens := make([]int, digits)
for i := range tokens {
if i >= digits {
break
}
intTokens[i], _ = strconv.Atoi(tokens[i])
}
return intTokens
}

func isSmallerVersion(version string, otherVersion string, digits int) bool {
v := versionTokens(version, digits)
o := versionTokens(otherVersion, digits)
for i := 0; i < len(v); i++ {
if v[i] < o[i] {
return true
}
if v[i] > o[i] {
return false
}
if i == digits {
break
}
}
return false
}

// IsSmallerMajorVersion tests two versions against another and returns true if
// the former is a smaller "major" version than the latter.
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
func IsSmallerMajorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 2)
}

// IsSmallerMinorVersion tests two versions against another and returns true if
// the former is a smaller "minor" version than the latter.
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
func IsSmallerMinorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 3)
}

0 comments on commit 2bb163c

Please sign in to comment.