diff --git a/README.md b/README.md index 79abff6..e06d76b 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ Spirit requires an account with these privileges: * `ALTER, CREATE, DELETE, DROP, INDEX, INSERT, LOCK TABLES, SELECT, TRIGGER, UPDATE` on the schema where the table is being migrated. * Either `SUPER, REPLICATION SLAVE on *.*` or `REPLICATION CLIENT, REPLICATION SLAVE on *.*`. +* The `RELOAD` privilege. For replica throttling, Spirit requires: diff --git a/pkg/check/privileges.go b/pkg/check/privileges.go index a8e99ce..1e8faaf 100644 --- a/pkg/check/privileges.go +++ b/pkg/check/privileges.go @@ -19,7 +19,7 @@ func init() { func privilegesCheck(ctx context.Context, r Resources, logger loggers.Advanced) error { // This is a re-implementation of the gh-ost check // validateGrants() in gh-ost/go/logic/inspect.go - var foundAll, foundSuper, foundReplicationClient, foundReplicationSlave, foundDBAll bool + var foundAll, foundSuper, foundReplicationClient, foundReplicationSlave, foundDBAll, foundReload bool rows, err := r.DB.QueryContext(ctx, `SHOW GRANTS`) //nolint: execinquery if err != nil { return err @@ -42,6 +42,9 @@ func privilegesCheck(ctx context.Context, r Resources, logger loggers.Advanced) if strings.Contains(grant, `REPLICATION SLAVE`) && strings.Contains(grant, ` ON *.*`) { foundReplicationSlave = true } + if strings.Contains(grant, `RELOAD`) && strings.Contains(grant, ` ON *.*`) { + foundReload = true + } if strings.Contains(grant, fmt.Sprintf("GRANT ALL PRIVILEGES ON `%s`.*", r.Table.SchemaName)) { foundDBAll = true } @@ -64,10 +67,10 @@ func privilegesCheck(ctx context.Context, r Resources, logger loggers.Advanced) if foundSuper && foundReplicationSlave && foundDBAll { return nil } - if foundReplicationClient && foundReplicationSlave && foundDBAll { + if foundReplicationClient && foundReplicationSlave && foundDBAll && foundReload { return nil } - return errors.New("insufficient privileges to run a migration. Needed: SUPER|REPLICATION CLIENT, REPLICATION SLAVE and ALL on %s.*") + return errors.New("insufficient privileges to run a migration. Needed: SUPER|REPLICATION CLIENT, RELOAD, REPLICATION SLAVE and ALL on %s.*") } // stringContainsAll returns true if `s` contains all non empty given `substrings` diff --git a/pkg/check/privileges_test.go b/pkg/check/privileges_test.go index 41319be..769e4d8 100644 --- a/pkg/check/privileges_test.go +++ b/pkg/check/privileges_test.go @@ -46,7 +46,7 @@ func TestPrivileges(t *testing.T) { err = privilegesCheck(context.Background(), r, logrus.New()) assert.Error(t, err) // still not enough, needs replication client - _, err = db.Exec("GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO testprivsuser") + _, err = db.Exec("GRANT REPLICATION CLIENT, REPLICATION SLAVE, RELOAD ON *.* TO testprivsuser") assert.NoError(t, err) err = privilegesCheck(context.Background(), r, logrus.New()) diff --git a/pkg/repl/client.go b/pkg/repl/client.go index a9241fe..4be9be9 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -403,15 +403,10 @@ func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock if err := c.flush(ctx, true, lock); err != nil { return err } - // TODO: Wait for the changes flushed to be received. - // Ideally we can call c.BlockWait() when - // https://github.com/cashapp/spirit/issues/337 merges. - // For now, we skip block wait. Because the check in AllChangesFlushed() - // now only returns a warning, this is fine. - // if err := c.BlockWait(ctx); err != nil { - // return err - // } - + // Wait for the changes flushed to be received. + if err := c.BlockWait(ctx); err != nil { + return err + } // Do a final flush return c.flush(ctx, true, lock) } @@ -715,36 +710,7 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration) // **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that // changes have been applied to the database. func (c *Client) BlockWait(ctx context.Context) error { - targetPos, err := c.canal.GetMasterPos() // what the server is at. - if err != nil { - return err - } - for i := 100; ; i++ { - if err := c.injectBinlogNoise(ctx); err != nil { - return err - } - canalPos := c.canal.SyncedPosition() - if i%100 == 0 { - // Print status every 100 loops = 10s - c.logger.Infof("blocking until we have read all binary logs: current-pos=%s target-pos=%s", canalPos, targetPos) - } - if canalPos.Compare(targetPos) >= 0 { - break - } - time.Sleep(100 * time.Millisecond) - } - return nil -} - -// injectBinlogNoise is used to inject some noise into the binlog stream -// This helps ensure that we are "past" a binary log position if there is some off-by-one -// problem where the most recent canal event is not yet updating the canal SyncedPosition, -// and there are no current changes on the MySQL server to advance itself. -// Note: We can not update the table or the newTable, because this intentionally -// causes a panic (c.tableChanged() is called). -func (c *Client) injectBinlogNoise(ctx context.Context) error { - tblName := fmt.Sprintf("_%s_chkpnt", c.table.TableName) - return dbconn.Exec(ctx, c.db, "ALTER TABLE %n.%n AUTO_INCREMENT=0", c.table.SchemaName, tblName) + return c.canal.CatchMasterPos(DefaultTimeout) } func (c *Client) keyHasChanged(key []interface{}, deleted bool) {