Skip to content

Commit

Permalink
require RELOAD privilege
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Aug 26, 2024
1 parent b369c2a commit 3f94a9e
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 44 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Spirit requires an account with these privileges:

* `ALTER, CREATE, DELETE, DROP, INDEX, INSERT, LOCK TABLES, SELECT, TRIGGER, UPDATE` on the schema where the table is being migrated.
* Either `SUPER, REPLICATION SLAVE on *.*` or `REPLICATION CLIENT, REPLICATION SLAVE on *.*`.
* The `RELOAD` privilege.

For replica throttling, Spirit requires:

Expand Down
9 changes: 6 additions & 3 deletions pkg/check/privileges.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func init() {
func privilegesCheck(ctx context.Context, r Resources, logger loggers.Advanced) error {
// This is a re-implementation of the gh-ost check
// validateGrants() in gh-ost/go/logic/inspect.go
var foundAll, foundSuper, foundReplicationClient, foundReplicationSlave, foundDBAll bool
var foundAll, foundSuper, foundReplicationClient, foundReplicationSlave, foundDBAll, foundReload bool
rows, err := r.DB.QueryContext(ctx, `SHOW GRANTS`) //nolint: execinquery
if err != nil {
return err
Expand All @@ -42,6 +42,9 @@ func privilegesCheck(ctx context.Context, r Resources, logger loggers.Advanced)
if strings.Contains(grant, `REPLICATION SLAVE`) && strings.Contains(grant, ` ON *.*`) {
foundReplicationSlave = true
}
if strings.Contains(grant, `RELOAD`) && strings.Contains(grant, ` ON *.*`) {
foundReload = true
}
if strings.Contains(grant, fmt.Sprintf("GRANT ALL PRIVILEGES ON `%s`.*", r.Table.SchemaName)) {
foundDBAll = true
}
Expand All @@ -64,10 +67,10 @@ func privilegesCheck(ctx context.Context, r Resources, logger loggers.Advanced)
if foundSuper && foundReplicationSlave && foundDBAll {
return nil
}
if foundReplicationClient && foundReplicationSlave && foundDBAll {
if foundReplicationClient && foundReplicationSlave && foundDBAll && foundReload {
return nil
}
return errors.New("insufficient privileges to run a migration. Needed: SUPER|REPLICATION CLIENT, REPLICATION SLAVE and ALL on %s.*")
return errors.New("insufficient privileges to run a migration. Needed: SUPER|REPLICATION CLIENT, RELOAD, REPLICATION SLAVE and ALL on %s.*")
}

// stringContainsAll returns true if `s` contains all non empty given `substrings`
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/privileges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestPrivileges(t *testing.T) {
err = privilegesCheck(context.Background(), r, logrus.New())
assert.Error(t, err) // still not enough, needs replication client

_, err = db.Exec("GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO testprivsuser")
_, err = db.Exec("GRANT REPLICATION CLIENT, REPLICATION SLAVE, RELOAD ON *.* TO testprivsuser")
assert.NoError(t, err)

err = privilegesCheck(context.Background(), r, logrus.New())
Expand Down
44 changes: 5 additions & 39 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,10 @@ func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock
if err := c.flush(ctx, true, lock); err != nil {
return err
}
// TODO: Wait for the changes flushed to be received.
// Ideally we can call c.BlockWait() when
// https://github.com/cashapp/spirit/issues/337 merges.
// For now, we skip block wait. Because the check in AllChangesFlushed()
// now only returns a warning, this is fine.
// if err := c.BlockWait(ctx); err != nil {
// return err
// }

// Wait for the changes flushed to be received.
if err := c.BlockWait(ctx); err != nil {
return err
}
// Do a final flush
return c.flush(ctx, true, lock)
}
Expand Down Expand Up @@ -715,36 +710,7 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)
// **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that
// changes have been applied to the database.
func (c *Client) BlockWait(ctx context.Context) error {
targetPos, err := c.canal.GetMasterPos() // what the server is at.
if err != nil {
return err
}
for i := 100; ; i++ {
if err := c.injectBinlogNoise(ctx); err != nil {
return err
}
canalPos := c.canal.SyncedPosition()
if i%100 == 0 {
// Print status every 100 loops = 10s
c.logger.Infof("blocking until we have read all binary logs: current-pos=%s target-pos=%s", canalPos, targetPos)
}
if canalPos.Compare(targetPos) >= 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
return nil
}

// injectBinlogNoise is used to inject some noise into the binlog stream
// This helps ensure that we are "past" a binary log position if there is some off-by-one
// problem where the most recent canal event is not yet updating the canal SyncedPosition,
// and there are no current changes on the MySQL server to advance itself.
// Note: We can not update the table or the newTable, because this intentionally
// causes a panic (c.tableChanged() is called).
func (c *Client) injectBinlogNoise(ctx context.Context) error {
tblName := fmt.Sprintf("_%s_chkpnt", c.table.TableName)
return dbconn.Exec(ctx, c.db, "ALTER TABLE %n.%n AUTO_INCREMENT=0", c.table.SchemaName, tblName)
return c.canal.CatchMasterPos(DefaultTimeout)
}

func (c *Client) keyHasChanged(key []interface{}, deleted bool) {
Expand Down
2 changes: 1 addition & 1 deletion scripts/manual_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ deploy_replication() {
start_mysql primary "$base_port" || return

exec_mysql_sock primary "CREATE USER 'repl'@'%' IDENTIFIED BY 'replica'"
exec_mysql_sock primary "GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%'"
exec_mysql_sock primary "GRANT REPLICATION SLAVE, RELOAD ON *.* TO 'repl'@'%'"

# SOURCE_AUTO_POSITION=1"

Expand Down

0 comments on commit 3f94a9e

Please sign in to comment.