Skip to content

Commit

Permalink
Merge pull request #339 from cashapp/mtocker-inject-noise
Browse files Browse the repository at this point in the history
repl: revert to inject-noise based master wait
  • Loading branch information
kolbe authored Aug 15, 2024
2 parents fef077e + 6354694 commit b369c2a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/alecthomas/kong v0.7.1
github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3
github.com/go-mysql-org/go-mysql v1.8.1-0.20240805131754-ccf204bf2b2a
github.com/go-sql-driver/mysql v1.7.1
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67
Expand Down Expand Up @@ -40,5 +40,3 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240809144225-c94bbea2a85e
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-mysql-org/go-mysql v1.8.1-0.20240805131754-ccf204bf2b2a h1:VO6kiE9ex1uNaCCgDz/q0EhTueLrr3vmxkjJpU2x6pk=
github.com/go-mysql-org/go-mysql v1.8.1-0.20240805131754-ccf204bf2b2a/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
Expand All @@ -33,8 +35,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/morgo/go-mysql v0.0.0-20240809144225-c94bbea2a85e h1:2qxqEqsL1z1psCF6r6m+2FlW7b1iItFjILzWJiiwqt0=
github.com/morgo/go-mysql v0.0.0-20240809144225-c94bbea2a85e/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8=
Expand Down
47 changes: 40 additions & 7 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ func (c *Client) Run() (err error) {
cfg.Password = c.password
cfg.Logger = NewLogWrapper(c.logger) // wrapper to filter the noise.
cfg.IncludeTableRegex = []string{fmt.Sprintf("^%s\\.%s$", c.table.SchemaName, c.table.TableName)}
cfg.Dump.ExecutionPath = "" // skip dump
cfg.DisableFlushBinlogWhileWaiting = true // can't guarantee privileges exist.
cfg.Dump.ExecutionPath = "" // skip dump
if dbconn.IsRDSHost(cfg.Addr) {
// create a new TLSConfig for RDS
// It needs to be a copy because sharing a global pointer
Expand Down Expand Up @@ -404,10 +403,15 @@ func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock
if err := c.flush(ctx, true, lock); err != nil {
return err
}
// Wait for the changes flushed to be received.
if err := c.BlockWait(ctx); err != nil {
return err
}
// TODO: Wait for the changes flushed to be received.
// Ideally we can call c.BlockWait() when
// https://github.com/cashapp/spirit/issues/337 merges.
// For now, we skip block wait. Because the check in AllChangesFlushed()
// now only returns a warning, this is fine.
// if err := c.BlockWait(ctx); err != nil {
// return err
// }

// Do a final flush
return c.flush(ctx, true, lock)
}
Expand Down Expand Up @@ -711,7 +715,36 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)
// **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that
// changes have been applied to the database.
func (c *Client) BlockWait(ctx context.Context) error {
return c.canal.CatchMasterPos(DefaultTimeout)
targetPos, err := c.canal.GetMasterPos() // what the server is at.
if err != nil {
return err
}
for i := 100; ; i++ {
if err := c.injectBinlogNoise(ctx); err != nil {
return err
}
canalPos := c.canal.SyncedPosition()
if i%100 == 0 {
// Print status every 100 loops = 10s
c.logger.Infof("blocking until we have read all binary logs: current-pos=%s target-pos=%s", canalPos, targetPos)
}
if canalPos.Compare(targetPos) >= 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
return nil
}

// injectBinlogNoise is used to inject some noise into the binlog stream
// This helps ensure that we are "past" a binary log position if there is some off-by-one
// problem where the most recent canal event is not yet updating the canal SyncedPosition,
// and there are no current changes on the MySQL server to advance itself.
// Note: We can not update the table or the newTable, because this intentionally
// causes a panic (c.tableChanged() is called).
func (c *Client) injectBinlogNoise(ctx context.Context) error {
tblName := fmt.Sprintf("_%s_chkpnt", c.table.TableName)
return dbconn.Exec(ctx, c.db, "ALTER TABLE %n.%n AUTO_INCREMENT=0", c.table.SchemaName, tblName)
}

func (c *Client) keyHasChanged(key []interface{}, deleted bool) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/repl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,10 @@ func TestBlockWait(t *testing.T) {
db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

testutils.RunSQL(t, "DROP TABLE IF EXISTS blockwaitt1, blockwaitt2")
testutils.RunSQL(t, "DROP TABLE IF EXISTS blockwaitt1, blockwaitt2, _blockwaitt1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE blockwaitt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE blockwaitt2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _blockwaitt1_chkpnt (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")

t1 := table.NewTableInfo(db, "test", "blockwaitt1")
assert.NoError(t, t1.SetInfo(context.TODO()))
Expand Down

0 comments on commit b369c2a

Please sign in to comment.