Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiStatement to apply DML #1462

Merged
merged 6 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 68 additions & 15 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

"github.com/openark/golib/log"
"context"
"database/sql/driver"

"github.com/github/gh-ost/go/mysql"
drivermysql "github.com/go-sql-driver/mysql"
"github.com/openark/golib/sqlutils"
)

Expand Down Expand Up @@ -1207,13 +1210,19 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
ctx := context.TODO()

err := func() error {
tx, err := this.db.Begin()
conn, err := this.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()

tx, err := conn.BeginTx(ctx, nil)
if err != nil {
return err
}
Comment on lines +1229 to +1232
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have the BEGIN statement be the first in the list of the multistatement and avoid the network roundtrip for this.

SET SESSION can happen before the BEGIN now. The reason why it was done inside of the transaction was to ensure that it happens on the same connection, but we're now explicitly checking out the connection out of the database pool.

rollback := func(err error) error {
tx.Rollback()
return err
Expand All @@ -1225,26 +1234,70 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}

buildResults := make([]*dmlBuildResult, 0, len(dmlEvents))
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return rollback(buildResult.err)
}
result, err := tx.Exec(buildResult.query, buildResult.args...)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
return rollback(err)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err)
rowsAffected = 1
buildResults = append(buildResults, buildResult)
}
}

execErr := conn.Raw(func(driverConn any) error {
ex, ok := driverConn.(driver.ExecerContext)
if !ok {
return fmt.Errorf("could not cast driverConn to ExecerContext")
}

nvc, ok := driverConn.(driver.NamedValueChecker)
if !ok {
return fmt.Errorf("could not cast driverConn to NamedValueChecker")
}
meiji163 marked this conversation as resolved.
Show resolved Hide resolved

var multiArgs []driver.NamedValue
meiji163 marked this conversation as resolved.
Show resolved Hide resolved
multiQueryBuilder := strings.Builder{}
var rowDeltas []int64
meiji163 marked this conversation as resolved.
Show resolved Hide resolved

for _, buildResult := range buildResults {
for _, arg := range buildResult.args {
nv := driver.NamedValue{Value: driver.Value(arg)}
nvc.CheckNamedValue(&nv)
multiArgs = append(multiArgs, nv)
}
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
totalDelta += buildResult.rowsDelta * rowsAffected

multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")

rowDeltas = append(rowDeltas, buildResult.rowsDelta)
}

// this.migrationContext.Log.Infof("Executing query: %s, args: %+v", multiQueryBuilder.String(), multiArgs)
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
this.migrationContext.Log.Errorf("Error exec: %+v", err)
return err
}

mysqlRes, ok := res.(drivermysql.Result)
if !ok {
return fmt.Errorf("Could not cast %+v to mysql.Result", res)
}
meiji163 marked this conversation as resolved.
Show resolved Hide resolved

// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
totalDelta += rowDeltas[i] * rowsAffected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we add BEGIN to the multi statement, this calculation here will need to be updated accordingly.

}

return nil
})

if execErr != nil {
return rollback(execErr)
}
if err := tx.Commit(); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions go/mysql/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
connectionParams := []string{
"autocommit=true",
"interpolateParams=true",
"multiStatements=true",
meiji163 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("charset=%s", this.Charset),
fmt.Sprintf("tls=%s", tlsOption),
fmt.Sprintf("transaction_isolation=%q", this.TransactionIsolation),
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestGetDBUri(t *testing.T) {
c.Charset = "utf8mb4,utf8,latin1"

uri := c.GetDBUri("test")
require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4,utf8,latin1&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri)
require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&multiStatements=true&charset=utf8mb4,utf8,latin1&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri)
}

func TestGetDBUriWithTLSSetup(t *testing.T) {
Expand All @@ -100,5 +100,5 @@ func TestGetDBUriWithTLSSetup(t *testing.T) {
c.Charset = "utf8mb4_general_ci,utf8_general_ci,latin1"

uri := c.GetDBUri("test")
require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri)
require.Equal(t, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&multiStatements=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`, uri)
}